You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/02/01 09:29:41 UTC

[3/3] incubator-asterixdb-hyracks git commit: Add Support for Upsert Operation

Add Support for Upsert Operation

This change adds support for upsert operations. it includes
creating a primary and secondary upsert operators in addition
to adding a new function "before" to the index operation call
back to correctly perform locking for the upsert operation.

Change-Id: I2705f43b6e6d187ee29b9ba5a7946d422990022a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/476
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/f1fdb156
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/f1fdb156
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/f1fdb156

Branch: refs/heads/master
Commit: f1fdb156a094eaee9f5d261e0cd7c6f5fe2d063c
Parents: 637e955
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Sat Jan 30 11:13:47 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Mon Feb 1 00:24:31 2016 -0800

----------------------------------------------------------------------
 .../core/algebra/base/LogicalOperatorTag.java   |   4 +-
 .../algebra/metadata/IMetadataProvider.java     |  18 +-
 .../logical/IndexInsertDeleteOperator.java      | 146 -------------
 .../IndexInsertDeleteUpsertOperator.java        | 184 +++++++++++++++++
 .../operators/logical/InsertDeleteOperator.java | 133 ------------
 .../logical/InsertDeleteUpsertOperator.java     | 203 +++++++++++++++++++
 .../operators/logical/ReplicateOperator.java    |   6 +-
 .../operators/logical/TokenizeOperator.java     |   2 +-
 .../visitors/FDsAndEquivClassesVisitor.java     |   8 +-
 .../visitors/IsomorphismOperatorVisitor.java    |  16 +-
 .../IsomorphismVariableMappingVisitor.java      |   8 +-
 .../visitors/LogicalPropertiesVisitor.java      |   8 +-
 .../visitors/OperatorDeepCopyVisitor.java       |  12 +-
 .../visitors/ProducedVariableVisitor.java       |  11 +-
 .../logical/visitors/SchemaVariableVisitor.java |   8 +-
 .../visitors/SubstituteVariableVisitor.java     |   8 +-
 .../logical/visitors/UsedVariableVisitor.java   |  20 +-
 .../logical/visitors/VariableUtilities.java     |  25 +++
 .../operators/physical/BulkloadPOperator.java   |   6 +-
 .../physical/IndexBulkloadPOperator.java        |   6 +-
 .../physical/IndexInsertDeletePOperator.java    | 135 ------------
 .../IndexInsertDeleteUpsertPOperator.java       | 150 ++++++++++++++
 .../physical/InsertDeletePOperator.java         | 125 ------------
 .../physical/InsertDeleteUpsertPOperator.java   | 135 ++++++++++++
 .../operators/physical/TokenizePOperator.java   |   2 +-
 .../LogicalOperatorPrettyPrintVisitor.java      |  39 +++-
 .../visitors/ILogicalOperatorVisitor.java       |   8 +-
 .../algebra/visitors/IQueryOperatorVisitor.java |   8 +-
 .../piglet/metadata/PigletMetadataProvider.java |  50 +++--
 .../SetAlgebricksPhysicalOperatorsRule.java     |  74 ++++---
 .../common/comm/io/FrameTupleAccessor.java      |  57 +++++-
 .../data/accessors/FrameTupleReference.java     |   6 +-
 .../storage/am/btree/impls/RangePredicate.java  |   3 +
 .../api/IModificationOperationCallback.java     |  37 ++--
 .../am/common/api/ISearchOperationCallback.java |  48 +++--
 .../storage/am/common/api/ISearchPredicate.java |  14 +-
 .../am/common/impls/NoOpOperationCallback.java  |  11 +-
 .../tuples/PermutingFrameTupleReference.java    |  83 ++++----
 .../storage/am/lsm/btree/impls/LSMBTree.java    |   2 +-
 .../am/lsm/btree/impls/LSMBTreeOpContext.java   |   4 +-
 .../storage/am/lsm/common/api/ILSMHarness.java  |  13 +-
 .../common/api/ILSMIndexOperationContext.java   |   2 +-
 ...dexInsertUpdateDeleteOperatorDescriptor.java |  16 +-
 .../storage/am/lsm/common/impls/LSMHarness.java |  24 ++-
 .../invertedindex/impls/LSMInvertedIndex.java   |  93 ++++-----
 .../impls/LSMInvertedIndexOpContext.java        |  12 +-
 ...SMInvertedIndexSearchCursorInitialState.java |  13 +-
 .../search/InvertedIndexSearchPredicate.java    |   5 +
 .../am/lsm/rtree/impls/AbstractLSMRTree.java    |  13 +-
 .../storage/am/lsm/rtree/impls/LSMRTree.java    |   5 +-
 .../am/lsm/rtree/impls/LSMRTreeOpContext.java   |   4 +-
 .../am/rtree/impls/RTreeSearchCursor.java       |   2 +-
 .../storage/am/rtree/impls/SearchPredicate.java |   5 +-
 ...stractModificationOperationCallbackTest.java |  15 +-
 .../AbstractSearchOperationCallbackTest.java    |  14 +-
 .../am/common/TestOperationCallback.java        |  11 +-
 .../LSMBTreeSearchOperationCallbackTest.java    |   8 +-
 57 files changed, 1198 insertions(+), 880 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5388092..7e9da44 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -29,9 +29,9 @@ public enum LogicalOperatorTag {
     EXTENSION_OPERATOR,
     EXTERNAL_LOOKUP,
     GROUP,
-    INDEX_INSERT_DELETE,
+    INDEX_INSERT_DELETE_UPSERT,
     INNERJOIN,
-    INSERT_DELETE,
+    INSERT_DELETE_UPSERT,
     LEFTOUTERJOIN,
     LIMIT,
     MATERIALIZE,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 53857d2..823ebae 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -48,13 +48,13 @@ public interface IMetadataProvider<S, I> {
             List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
             List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
             IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
 
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
             int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
@@ -63,7 +63,7 @@ public interface IMetadataProvider<S, I> {
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
             List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
-            throws AlgebricksException;
+                    throws AlgebricksException;
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -193,4 +193,16 @@ public interface IMetadataProvider<S, I> {
 
     public IFunctionInfo lookupFunction(FunctionIdentifier fid);
 
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
+            IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+            LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+            RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+            IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+            List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
+            List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
+            RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
deleted file mode 100644
index e9e3b01..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class IndexInsertDeleteOperator extends AbstractLogicalOperator {
-
-    private final IDataSourceIndex<?, ?> dataSourceIndex;
-    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
-    // In the bulk-load case on ngram or keyword index,
-    // it contains [token, number of token] or [token].
-    // Otherwise, it contains secondary key information.
-    private final List<Mutable<ILogicalExpression>> secondaryKeyExprs;
-    private final Mutable<ILogicalExpression> filterExpr;
-    private final Kind operation;
-    private final boolean bulkload;
-    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
-
-    public IndexInsertDeleteOperator(IDataSourceIndex<?, ?> dataSourceIndex,
-            List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
-            Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
-        this.dataSourceIndex = dataSourceIndex;
-        this.primaryKeyExprs = primaryKeyExprs;
-        this.secondaryKeyExprs = secondaryKeyExprs;
-        this.filterExpr = filterExpr;
-        this.operation = operation;
-        this.bulkload = bulkload;
-    }
-
-    @Override
-    public void recomputeSchema() throws AlgebricksException {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
-    }
-
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
-        boolean b = false;
-        for (int i = 0; i < primaryKeyExprs.size(); i++) {
-            if (visitor.transform(primaryKeyExprs.get(i))) {
-                b = true;
-            }
-        }
-        for (int i = 0; i < secondaryKeyExprs.size(); i++) {
-            if (visitor.transform(secondaryKeyExprs.get(i))) {
-                b = true;
-            }
-        }
-        return b;
-    }
-
-    @Override
-    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitIndexInsertDeleteOperator(this, arg);
-    }
-
-    @Override
-    public boolean isMap() {
-        return false;
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.INDEX_INSERT_DELETE;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
-        return primaryKeyExprs;
-    }
-
-    public IDataSourceIndex<?, ?> getDataSourceIndex() {
-        return dataSourceIndex;
-    }
-
-    public String getIndexName() {
-        return dataSourceIndex.getId().toString();
-    }
-
-    public List<Mutable<ILogicalExpression>> getSecondaryKeyExpressions() {
-        return secondaryKeyExprs;
-    }
-
-    public Mutable<ILogicalExpression> getFilterExpression() {
-        return filterExpr;
-    }
-
-    public Kind getOperation() {
-        return operation;
-    }
-
-    public boolean isBulkload() {
-        return bulkload;
-    }
-
-    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
-        this.additionalFilteringExpressions = additionalFilteringExpressions;
-    }
-
-    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
-        return additionalFilteringExpressions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
new file mode 100644
index 0000000..c06cc18
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class IndexInsertDeleteUpsertOperator extends AbstractLogicalOperator {
+
+    private final IDataSourceIndex<?, ?> dataSourceIndex;
+    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+    // In the bulk-load case on ngram or keyword index,
+    // it contains [token, number of token] or [token].
+    // Otherwise, it contains secondary key information.
+    private final List<Mutable<ILogicalExpression>> secondaryKeyExprs;
+    private final Mutable<ILogicalExpression> filterExpr;
+    private final Kind operation;
+    private final boolean bulkload;
+    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+    // used for upsert operations
+    private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
+    private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
+
+    public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
+            Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
+        this.dataSourceIndex = dataSourceIndex;
+        this.primaryKeyExprs = primaryKeyExprs;
+        this.secondaryKeyExprs = secondaryKeyExprs;
+        this.filterExpr = filterExpr;
+        this.operation = operation;
+        this.bulkload = bulkload;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>();
+        schema.addAll(inputs.get(0).getValue().getSchema());
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        boolean b = false;
+        // Primary
+        for (int i = 0; i < primaryKeyExprs.size(); i++) {
+            if (visitor.transform(primaryKeyExprs.get(i))) {
+                b = true;
+            }
+        }
+        // Secondary
+        for (int i = 0; i < secondaryKeyExprs.size(); i++) {
+            if (visitor.transform(secondaryKeyExprs.get(i))) {
+                b = true;
+            }
+        }
+        // Additional Filtering <For upsert>
+        if (additionalFilteringExpressions != null) {
+            for (int i = 0; i < additionalFilteringExpressions.size(); i++) {
+                if (visitor.transform(additionalFilteringExpressions.get(i))) {
+                    b = true;
+                }
+            }
+        }
+        // Old secondary <For upsert>
+        if (prevSecondaryKeyExprs != null) {
+            for (int i = 0; i < prevSecondaryKeyExprs.size(); i++) {
+                if (visitor.transform(prevSecondaryKeyExprs.get(i))) {
+                    b = true;
+                }
+            }
+        }
+        // Old Filtering <For upsert>
+        if (prevAdditionalFilteringExpression != null) {
+            visitor.transform(prevAdditionalFilteringExpression);
+        }
+        return b;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitIndexInsertDeleteUpsertOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
+        return primaryKeyExprs;
+    }
+
+    public IDataSourceIndex<?, ?> getDataSourceIndex() {
+        return dataSourceIndex;
+    }
+
+    public String getIndexName() {
+        return dataSourceIndex.getId().toString();
+    }
+
+    public List<Mutable<ILogicalExpression>> getSecondaryKeyExpressions() {
+        return secondaryKeyExprs;
+    }
+
+    public Mutable<ILogicalExpression> getFilterExpression() {
+        return filterExpr;
+    }
+
+    public Kind getOperation() {
+        return operation;
+    }
+
+    public boolean isBulkload() {
+        return bulkload;
+    }
+
+    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
+        this.additionalFilteringExpressions = additionalFilteringExpressions;
+    }
+
+    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
+        return additionalFilteringExpressions;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPrevSecondaryKeyExprs() {
+        return prevSecondaryKeyExprs;
+    }
+
+    public void setPrevSecondaryKeyExprs(List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs) {
+        this.prevSecondaryKeyExprs = prevSecondaryKeyExprs;
+    }
+
+    public Mutable<ILogicalExpression> getPrevAdditionalFilteringExpression() {
+        return prevAdditionalFilteringExpression;
+    }
+
+    public void setPrevAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
+        this.prevAdditionalFilteringExpression = prevAdditionalFilteringExpression;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
deleted file mode 100644
index b95d279..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class InsertDeleteOperator extends AbstractLogicalOperator {
-
-    public enum Kind {
-        INSERT,
-        DELETE
-    }
-
-    private final IDataSource<?> dataSource;
-
-    private final Mutable<ILogicalExpression> payloadExpr;
-
-    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
-
-    private final Kind operation;
-    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
-
-    private final boolean bulkload;
-
-    public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
-            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
-        this.dataSource = dataSource;
-        this.payloadExpr = payloadExpr;
-        this.primaryKeyExprs = primaryKeyExprs;
-        this.operation = operation;
-        this.bulkload = bulkload;
-    }
-
-    @Override
-    public void recomputeSchema() throws AlgebricksException {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
-    }
-
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
-        boolean changed = false;
-        changed = transform.transform(payloadExpr);
-        for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
-            changed |= transform.transform(e);
-        }
-        return changed;
-    }
-
-    @Override
-    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitInsertDeleteOperator(this, arg);
-    }
-
-    @Override
-    public boolean isMap() {
-        return false;
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.INSERT_DELETE;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
-        return primaryKeyExprs;
-    }
-
-    public IDataSource<?> getDataSource() {
-        return dataSource;
-    }
-
-    public Mutable<ILogicalExpression> getPayloadExpression() {
-        return payloadExpr;
-    }
-
-    public Kind getOperation() {
-        return operation;
-    }
-
-    public boolean isBulkload() {
-        return bulkload;
-	}
-
-    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
-        this.additionalFilteringExpressions = additionalFilteringExpressions;
-    }
-
-    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
-        return additionalFilteringExpressions;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
new file mode 100644
index 0000000..607db69
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
+
+    public enum Kind {
+        INSERT,
+        DELETE,
+        UPSERT
+    }
+
+    private final IDataSource<?> dataSource;
+    private final Mutable<ILogicalExpression> payloadExpr;
+    private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+    private final Kind operation;
+    private final boolean bulkload;
+    private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+    private LogicalVariable prevRecordVar;
+    private Object prevRecordType;
+    private LogicalVariable prevFilterVar;
+    private Object prevFilterType;
+
+    public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+            List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
+        this.dataSource = dataSource;
+        this.payloadExpr = payloadExpr;
+        this.primaryKeyExprs = primaryKeyExprs;
+        this.operation = operation;
+        this.bulkload = bulkload;
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>();
+        schema.addAll(inputs.get(0).getValue().getSchema());
+        if (operation == Kind.UPSERT) {
+            // The upsert case also produces the previous record
+            schema.add(prevRecordVar);
+            if (prevFilterVar != null) {
+                schema.add(prevFilterVar);
+            }
+        }
+    }
+
+    public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
+        if (prevRecordVar != null) {
+            producedVariables.add(prevRecordVar);
+        }
+        if (prevFilterVar != null) {
+            producedVariables.add(prevFilterVar);
+        }
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        boolean changed = false;
+        changed = transform.transform(payloadExpr);
+        for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
+            changed |= transform.transform(e);
+        }
+        if (additionalFilteringExpressions != null) {
+            for (Mutable<ILogicalExpression> e : additionalFilteringExpressions) {
+                changed |= transform.transform(e);
+            }
+        }
+        return changed;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitInsertDeleteUpsertOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return new VariablePropagationPolicy() {
+            @Override
+            public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+                    throws AlgebricksException {
+                target.addAllVariables(sources[0]);
+                if (operation == Kind.UPSERT) {
+                    target.addVariable(prevRecordVar);
+                    if (prevFilterVar != null) {
+                        target.addVariable(prevFilterVar);
+                    }
+                }
+            }
+        };
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.INSERT_DELETE_UPSERT;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+        if (operation == Kind.UPSERT) {
+            env.setVarType(prevRecordVar, prevRecordType);
+            if (prevFilterVar != null) {
+                env.setVarType(prevFilterVar, prevFilterType);
+            }
+        }
+        return env;
+    }
+
+    public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
+        return primaryKeyExprs;
+    }
+
+    public IDataSource<?> getDataSource() {
+        return dataSource;
+    }
+
+    public Mutable<ILogicalExpression> getPayloadExpression() {
+        return payloadExpr;
+    }
+
+    public Kind getOperation() {
+        return operation;
+    }
+
+    public boolean isBulkload() {
+        return bulkload;
+    }
+
+    public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
+        this.additionalFilteringExpressions = additionalFilteringExpressions;
+    }
+
+    public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
+        return additionalFilteringExpressions;
+    }
+
+    public LogicalVariable getPrevRecordVar() {
+        return prevRecordVar;
+    }
+
+    public void setPrevRecordVar(LogicalVariable prevRecordVar) {
+        this.prevRecordVar = prevRecordVar;
+    }
+
+    public void setPrevRecordType(Object recordType) {
+        prevRecordType = recordType;
+    }
+
+    public LogicalVariable getPrevFilterVar() {
+        return prevFilterVar;
+    }
+
+    public void setPrevFilterVar(LogicalVariable prevFilterVar) {
+        this.prevFilterVar = prevFilterVar;
+    }
+
+    public Object getPrevFilterType() {
+        return prevFilterType;
+    }
+
+    public void setPrevFilterType(Object prevFilterType) {
+        this.prevFilterType = prevFilterType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 2ba4969..343ace8 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -62,7 +61,8 @@ public class ReplicateOperator extends AbstractLogicalOperator {
     }
 
     @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
         return false;
     }
 
@@ -73,7 +73,7 @@ public class ReplicateOperator extends AbstractLogicalOperator {
 
     @Override
     public boolean isMap() {
-        return false;
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
index e1a793c..c69ead7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
@@ -29,7 +29,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 25993bb..b2c97c3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -56,9 +56,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -509,14 +509,14 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx)
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext ctx)
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext ctx)
             throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index dc535ea..7a4e7e1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -45,9 +45,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -454,11 +454,11 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
     }
 
     @Override
-    public Boolean visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+        if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT)
             return Boolean.FALSE;
-        InsertDeleteOperator insertOpArg = (InsertDeleteOperator) copyAndSubstituteVar(op, arg);
+        InsertDeleteUpsertOperator insertOpArg = (InsertDeleteUpsertOperator) copyAndSubstituteVar(op, arg);
         boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
         if (!op.getDataSource().equals(insertOpArg.getDataSource()))
             isomorphic = false;
@@ -468,12 +468,12 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
     }
 
     @Override
-    public Boolean visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+    public Boolean visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE)
+        if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT)
             return Boolean.FALSE;
-        IndexInsertDeleteOperator insertOpArg = (IndexInsertDeleteOperator) copyAndSubstituteVar(op, arg);
+        IndexInsertDeleteUpsertOperator insertOpArg = (IndexInsertDeleteUpsertOperator) copyAndSubstituteVar(op, arg);
         boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
         if (!op.getDataSourceIndex().equals(insertOpArg.getDataSourceIndex()))
             isomorphic = false;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 376c2bc..c46ffde 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -46,9 +46,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -251,13 +251,13 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8e378bc..4e5b13e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -37,9 +37,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -259,14 +259,14 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext arg)
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext arg)
             throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext arg)
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext arg)
             throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 5cf30c7..2e402fc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -42,9 +42,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -266,12 +266,12 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
     }
 
     @Override
-    public ILogicalOperator visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
         List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
-        InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
+        InsertDeleteUpsertOperator insertDeleteOp = new InsertDeleteUpsertOperator(op.getDataSource(),
                 deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(),
                 op.isBulkload());
         insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
@@ -279,7 +279,7 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
     }
 
     @Override
-    public ILogicalOperator visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg)
+    public ILogicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
             throws AlgebricksException {
         List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
@@ -289,7 +289,7 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
                 ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
         List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
         deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
-        IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
+        IndexInsertDeleteUpsertOperator indexInsertDeleteOp = new IndexInsertDeleteUpsertOperator(op.getDataSourceIndex(),
                 newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(),
                 op.isBulkload());
         indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index da5d2b2..8df772b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -42,9 +41,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -241,12 +240,14 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+        op.getProducedVariables(producedVariables);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+            throws AlgebricksException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index dc89c12..b488df1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -42,9 +42,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -283,13 +283,13 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) throws AlgebricksException {
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 4b791c1..91ff073 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -40,9 +40,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -388,7 +388,7 @@ public class SubstituteVariableVisitor
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
@@ -399,7 +399,7 @@ public class SubstituteVariableVisitor
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
             Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().substituteVar(pair.first, pair.second);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e82b4f7..cfc57c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -40,9 +40,9 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -344,11 +344,15 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
-    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) {
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) {
+        //1. The record variable
         op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+
+        //2. The primary key variables
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().getUsedVariables(usedVariables);
         }
+        //3. The filters variables
         if (op.getAdditionalFilteringExpressions() != null) {
             for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
                 e.getValue().getUsedVariables(usedVariables);
@@ -358,7 +362,7 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
-    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) {
+    public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) {
         for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
             e.getValue().getUsedVariables(usedVariables);
         }
@@ -370,6 +374,14 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
                 e.getValue().getUsedVariables(usedVariables);
             }
         }
+        if (op.getPrevAdditionalFilteringExpression() != null) {
+            op.getPrevAdditionalFilteringExpression().getValue().getUsedVariables(usedVariables);
+        }
+        if (op.getPrevSecondaryKeyExprs() != null) {
+            for (Mutable<ILogicalExpression> e : op.getPrevSecondaryKeyExprs()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 0352f83..28f783d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -34,18 +34,43 @@ import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisit
 
 public class VariableUtilities {
 
+    /***
+     * Adds the used variables in the logical operator to the list of used variables
+     *
+     * @param op
+     *          The target operator
+     * @param usedVariables
+     *          A list to be filled with variables used in the logical operator op.
+     * @throws AlgebricksException
+     */
     public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
             throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
         op.accept(visitor, null);
     }
 
+    /**
+     * Adds the variables produced in the logical operator in the list of produced variables
+     * @param op
+     *          The target operator
+     * @param producedVariables
+     *          The variables produced in the logical operator
+     * @throws AlgebricksException
+     */
     public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
             throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
         op.accept(visitor, null);
     }
 
+    /**
+     * Adds the variables that are live after the execution of this operator to the list of schema variables.
+     * @param op
+     *          The target logical operator
+     * @param schemaVariables
+     *          The list of live variables. The output of the operator and the propagated outputs of its children
+     * @throws AlgebricksException
+     */
     public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
             throws AlgebricksException {
         ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 449c04f..036ac05 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -35,8 +35,8 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -92,7 +92,7 @@ public class BulkloadPOperator extends AbstractPhysicalOperator {
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+        InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
         assert insertDeleteOp.getOperation() == Kind.INSERT;
         assert insertDeleteOp.isBulkload();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index 15144ab..b837bfa 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -38,8 +38,8 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
@@ -117,7 +117,7 @@ public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
-        IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+        IndexInsertDeleteUpsertOperator indexInsertDeleteOp = (IndexInsertDeleteUpsertOperator) op;
         assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
         assert indexInsertDeleteOp.isBulkload();
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/f1fdb156/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
deleted file mode 100644
index 29673b4..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class IndexInsertDeletePOperator extends AbstractPhysicalOperator {
-
-    private final List<LogicalVariable> primaryKeys;
-    private final List<LogicalVariable> secondaryKeys;
-    private final ILogicalExpression filterExpr;
-    private final IDataSourceIndex<?, ?> dataSourceIndex;
-    private final List<LogicalVariable> additionalFilteringKeys;
-
-    public IndexInsertDeletePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
-            List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
-            IDataSourceIndex<?, ?> dataSourceIndex) {
-        this.primaryKeys = primaryKeys;
-        this.secondaryKeys = secondaryKeys;
-        if (filterExpr != null) {
-            this.filterExpr = filterExpr.getValue();
-        } else {
-            this.filterExpr = null;
-        }
-        this.dataSourceIndex = dataSourceIndex;
-        this.additionalFilteringKeys = additionalFilteringKeys;
-    }
-
-    @Override
-    public PhysicalOperatorTag getOperatorTag() {
-        return PhysicalOperatorTag.INDEX_INSERT_DELETE;
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent) {
-        List<LogicalVariable> scanVariables = new ArrayList<LogicalVariable>();
-        scanVariables.addAll(primaryKeys);
-        scanVariables.add(new LogicalVariable(-1));
-        IPhysicalPropertiesVector r = dataSourceIndex.getDataSource().getPropertiesProvider()
-                .computePropertiesVector(scanVariables);
-        r.getLocalProperties().clear();
-        IPhysicalPropertiesVector[] requirements = new IPhysicalPropertiesVector[1];
-        requirements[0] = r;
-        return new PhysicalRequirements(requirements, IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Override
-    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
-            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
-        IndexInsertDeleteOperator insertDeleteOp = (IndexInsertDeleteOperator) op;
-        IMetadataProvider mp = context.getMetadataProvider();
-
-        JobSpecification spec = builder.getJobSpec();
-        RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
-                context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
-
-        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
-        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
-        if (insertDeleteOp.getOperation() == Kind.INSERT) {
-            runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
-        } else {
-            runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
-                    primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
-        }
-        builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
-        builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
-        ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
-    }
-
-    @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return false;
-    }
-}