You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/10/16 18:08:29 UTC

[4/4] asterixdb git commit: Enhanced Insert AQL

Enhanced Insert AQL

The optional "as Variable" provides a variable binding for the inserted records
The optional "returning Query" allows users to run simple
queries/functions on the records returned by the insert, and can refer
to the variable bound in "as Variable"

Allow commits to be non-sink operators (contnue job pipeline after commit)

Additionally, this change makes small modifications to
the extension code to prepare for the BAD extension

Also made the OptimizerTests able to work for Extensions

Change-Id: I65789d2a861d15232dd29156a6987d0635ec6c94
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1150
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/afa909a5
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/afa909a5
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/afa909a5

Branch: refs/heads/master
Commit: afa909a5794e1f24da4d619119ca2bc58f486959
Parents: e9b2adf
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Sun Oct 16 10:35:30 2016 -0700
Committer: Steven Jacobs <sj...@ucr.edu>
Committed: Sun Oct 16 11:07:33 2016 -0700

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |  18 ++
 .../active/IActiveEntityEventsListener.java     |   2 +-
 .../algebra/extension/IExtensionStatement.java  |   9 +-
 .../algebra/operators/CommitOperator.java       |  39 +++-
 .../operators/physical/CommitPOperator.java     |   6 +-
 .../operators/physical/CommitRuntime.java       |  74 +++---
 .../physical/CommitRuntimeFactory.java          |   8 +-
 .../operators/physical/UpsertCommitRuntime.java |  16 +-
 .../asterix/optimizer/base/RuleCollections.java |   4 +-
 .../rules/IntroduceAutogenerateIDRule.java      |  37 ++-
 .../rules/IntroduceDynamicTypeCastRule.java     |  42 ++--
 ...troduceRapidFrameFlushProjectAssignRule.java |   6 +-
 ...IntroduceSecondaryIndexInsertDeleteRule.java | 109 +++++----
 .../IntroduceStaticTypeCastForInsertRule.java   |  11 +-
 .../rules/ReplaceSinkOpWithCommitOpRule.java    | 166 -------------
 .../rules/SetupCommitExtensionOpRule.java       | 168 +++++++++++++
 .../SweepIllegalNonfunctionalFunctions.java     |   4 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   2 +-
 .../am/AbstractIntroduceAccessMethodRule.java   | 132 +++++------
 .../subplan/InlineAllNtsInSubplanVisitor.java   |   4 +-
 ...neLeftNtsInSubplanJoinFlatteningVisitor.java |   4 +-
 .../SubplanSpecialFlatteningCheckVisitor.java   |   4 +-
 .../asterix/translator/CompiledStatements.java  |  20 +-
 .../asterix/translator/IStatementExecutor.java  |   4 +-
 .../LangExpressionToPlanTranslator.java         | 198 +++++++++++-----
 .../asterix/api/http/servlet/FeedServlet.java   |  52 -----
 .../asterix/app/translator/QueryTranslator.java | 234 ++++++++++---------
 .../app/bootstrap/TestNodeController.java       |   2 +-
 .../asterix/test/optimizer/OptimizerTest.java   |  15 +-
 .../asterix/test/runtime/ExecutionTestUtil.java |  11 +-
 .../queries/insert-return-custom-result.aql     |  48 ++++
 .../results/insert-return-custom-result.plan    |  17 ++
 .../insert-return-records.1.ddl.aql             |  36 +++
 .../insert-return-records.3.query.aql           |  34 +++
 .../insert-returning-fieldname.1.ddl.aql        |  37 +++
 .../insert-returning-fieldname.3.query.aql      |  30 +++
 .../insert-with-bad-return.1.ddl.aql            |  36 +++
 .../insert-with-bad-return.3.query.aql          |  37 +++
 .../upsert-return-custom-result.1.ddl.aql       |  37 +++
 .../upsert-return-custom-result.3.query.aql     |  43 ++++
 .../insert-return-records.1.adm                 |   5 +
 .../insert-returning-fieldname.1.adm            |   1 +
 .../upsert-return-custom-result.1.adm           |   5 +
 .../src/test/resources/runtimets/testsuite.xml  |  21 ++
 .../asterix-doc/src/site/markdown/aql/manual.md |  10 +-
 .../api/IActiveLifecycleEventSubscriber.java    |  40 ++++
 .../feed/api/IFeedLifecycleEventSubscriber.java |  37 ---
 .../ActiveLifecycleEventSubscriber.java         |  67 ++++++
 .../feed/management/FeedEventsListener.java     |  32 +--
 .../FeedLifecycleEventSubscriber.java           |  66 ------
 .../external/feed/watch/FeedActivity.java       | 116 ---------
 .../feed/watch/FeedActivityDetails.java         |  27 +++
 .../aql/statement/SubscribeFeedStatement.java   |  12 +-
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |  16 +-
 .../lang/common/statement/InsertStatement.java  |  37 ++-
 .../lang/common/statement/UpsertStatement.java  |   6 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj |   2 +-
 .../org/apache/asterix/om/base/ADateTime.java   |  19 +-
 .../java/org/apache/asterix/om/base/AUUID.java  |  18 +-
 .../core/algebra/base/LogicalOperatorTag.java   |   2 +-
 .../AbstractDelegatedLogicalOperator.java       |  60 +++++
 .../AbstractExtensibleLogicalOperator.java      |  60 -----
 .../operators/logical/DelegateOperator.java     | 125 ++++++++++
 .../operators/logical/ExtensionOperator.java    | 124 ----------
 .../operators/logical/IOperatorDelegate.java    |  54 +++++
 .../operators/logical/IOperatorExtension.java   |  54 -----
 .../visitors/CardinalityInferenceVisitor.java   |   4 +-
 .../visitors/FDsAndEquivClassesVisitor.java     |   4 +-
 .../visitors/IsomorphismOperatorVisitor.java    |   8 +-
 .../IsomorphismVariableMappingVisitor.java      |   4 +-
 ...OperatorDeepCopyWithNewVariablesVisitor.java |   4 +-
 .../visitors/LogicalPropertiesVisitor.java      |   4 +-
 .../visitors/OperatorDeepCopyVisitor.java       |   6 +-
 .../visitors/PrimaryKeyVariablesVisitor.java    |   4 +-
 .../visitors/ProducedVariableVisitor.java       |   4 +-
 .../logical/visitors/SchemaVariableVisitor.java |   4 +-
 .../visitors/SubstituteVariableVisitor.java     |   4 +-
 .../logical/visitors/UsedVariableVisitor.java   |   4 +-
 .../LogicalOperatorPrettyPrintVisitor.java      |   4 +-
 .../visitors/ILogicalOperatorVisitor.java       |   4 +-
 ...placeNtsWithSubplanInputOperatorVisitor.java |   4 +-
 81 files changed, 1680 insertions(+), 1158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 9f4e650..efba47e 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -1,3 +1,21 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.    See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2dd9fe7..d0fb5e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,6 +31,6 @@ public interface IActiveEntityEventsListener {
 
     public EntityId getEntityId();
 
-    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName);
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName);
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index e88962a..d15ae6f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -21,8 +21,11 @@ package org.apache.asterix.algebra.extension;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -36,15 +39,17 @@ public interface IExtensionStatement extends Statement {
     }
 
     /**
-     * Called when the {@code IQueryTranslator} encounters an extension statement.
+     * Called when the {@code IStatementExecutor} encounters an extension statement.
      * An implementation class should implement the actual processing of the statement in this method.
      *
      * @param queryTranslator
      * @param metadataProvider
      * @param statementExecutor
      * @param hcc
+     * @param resultSetIdCounter
      * @throws Exception
      */
     void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws HyracksDataException, AlgebricksException;
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
index 8dd6d33..1ee130c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
@@ -19,23 +19,32 @@
 
 package org.apache.asterix.algebra.operators;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
-public class CommitOperator extends AbstractExtensibleLogicalOperator {
+public class CommitOperator extends AbstractDelegatedLogicalOperator {
 
-    private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final LogicalVariable upsertVar;
+    private List<LogicalVariable> primaryKeyLogicalVars;
+    private LogicalVariable upsertVar;
+    private boolean isSink;
 
-    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+    public CommitOperator(boolean isSink) {
+        this.isSink = isSink;
+        this.upsertVar = null;
+        primaryKeyLogicalVars = new ArrayList<>();
+    }
+
+    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
+        this.isSink = isSink;
     }
 
     @Override
@@ -44,9 +53,23 @@ public class CommitOperator extends AbstractExtensibleLogicalOperator {
         return false;
     }
 
+    public boolean isSink() {
+        return isSink;
+    }
+
+    public void setSink(boolean isSink) {
+        this.isSink = isSink;
+    }
+
+    //Provided for Extensions but not used by core
+    public void setVars(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+        this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+        this.upsertVar = upsertVar;
+    }
+
     @Override
-    public IOperatorExtension newInstance() {
-        return new CommitOperator(primaryKeyLogicalVars, upsertVar);
+    public IOperatorDelegate newInstance() {
+        return new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1a26021..5a1c929 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -47,15 +47,17 @@ public class CommitPOperator extends AbstractPhysicalOperator {
     private final String dataverse;
     private final String dataset;
     private final LogicalVariable upsertVar;
+    private final boolean isSink;
 
     public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
-            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
         this.dataverse = dataverse;
         this.dataset = dataset;
+        this.isSink = isSink;
     }
 
     @Override
@@ -105,7 +107,7 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         }
         runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
                 metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index c6c71f6..a1fa788 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -31,8 +31,7 @@ import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -45,7 +44,7 @@ import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
-public class CommitRuntime implements IPushRuntime {
+public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
     private final static long SEED = 0L;
 
@@ -57,27 +56,27 @@ public class CommitRuntime implements IPushRuntime {
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final FrameTupleReference frameTupleReference;
     protected final IHyracksTaskContext ctx;
     protected final int resourcePartition;
     protected ITransactionContext transactionContext;
     protected LogRecord logRecord;
-    protected FrameTupleAccessor frameTupleAccessor;
+    protected final boolean isSink;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) {
         this.ctx = ctx;
-        IAsterixAppRuntimeContext runtimeCtx =
-                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
-        this.frameTupleReference = new FrameTupleReference();
+        this.tRef = new FrameTupleReference();
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
+        this.isSink = isSink;
         longHashes = new long[2];
     }
 
@@ -86,9 +85,14 @@ public class CommitRuntime implements IPushRuntime {
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK,
+                    ctx);
             logRecord = new LogRecord(callback);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            writer.open();
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -96,26 +100,27 @@ public class CommitRuntime implements IPushRuntime {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        frameTupleAccessor.reset(buffer);
-        int nTuple = frameTupleAccessor.getTupleCount();
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
         for (int t = 0; t < nTuple; t++) {
             if (isTemporaryDatasetWriteJob) {
                 /**
-                 * This "if branch" is for writes over temporary datasets.
-                 * A temporary dataset does not require any lock and does not generate any write-ahead
-                 * update and commit log but generates flush log and job commit log.
-                 * However, a temporary dataset still MUST guarantee no-steal policy so that this
-                 * notification call should be delivered to PrimaryIndexOptracker and used correctly in order
-                 * to decrement number of active operation count of PrimaryIndexOptracker.
-                 * By maintaining the count correctly and only allowing flushing when the count is 0, it can
-                 * guarantee the no-steal policy for temporary datasets, too.
+                 * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock
+                 * and does not generate any write-ahead update and commit log but generates flush log and job commit
+                 * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call
+                 * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of
+                 * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
+                 * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
                  */
                 transactionContext.notifyOptracker(false);
             } else {
-                frameTupleReference.reset(frameTupleAccessor, t);
+                tRef.reset(tAccess, t);
                 try {
                     formLogRecord(buffer, t);
                     logMgr.log(logRecord);
+                    if (!isSink) {
+                        appendTupleToFrame(t);
+                    }
                 } catch (ACIDException e) {
                     throw new HyracksDataException(e);
                 }
@@ -141,8 +146,8 @@ public class CommitRuntime implements IPushRuntime {
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, frameTupleReference,
+        int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
                 primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
     }
 
@@ -153,22 +158,27 @@ public class CommitRuntime implements IPushRuntime {
 
     @Override
     public void fail() throws HyracksDataException {
-
+        failed = true;
+        if (isSink) {
+            return;
+        }
+        writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-
-    }
-
-    @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-        throw new IllegalStateException();
+        if (isSink) {
+            return;
+        }
+        flushIfNotFailed();
+        writer.close();
+        appender.reset(frame, true);
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
-        this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index 4f28b9d..9486a19 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -36,9 +36,10 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
     private final boolean isWriteTransaction;
     private final int upsertVarIdx;
     private int[] datasetPartitions;
+    private final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions) {
+            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
@@ -46,6 +47,7 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
         this.isWriteTransaction = isWriteTransaction;
         this.upsertVarIdx = upsertVarIdx;
         this.datasetPartitions = datasetPartitions;
+        this.isSink = isSink;
     }
 
     @Override
@@ -58,10 +60,10 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
         if (upsertVarIdx >= 0) {
             return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
                     isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx);
+                    upsertVarIdx, isSink);
         } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()]);
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
index 7358700..53e0f62 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
@@ -30,25 +30,25 @@ public class UpsertCommitRuntime extends CommitRuntime {
     private final int upsertIdx;
 
     public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx,
+            boolean isSink) {
         super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition);
+                resourcePartition, isSink);
         this.upsertIdx = upsertIdx;
     }
 
     @Override
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(),
-                frameTupleAccessor.getFieldSlotsLength() + frameTupleAccessor.getTupleStartOffset(t)
-                        + frameTupleAccessor.getFieldStartOffset(t, upsertIdx) + 1);
+        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength()
+                + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1);
         if (isNull) {
             // Previous record not found (insert)
             super.formLogRecord(buffer, t);
         } else {
             // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash,
-                    frameTupleReference, primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
+            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
+                    primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index cd8d747..56a3bd0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -66,7 +66,7 @@ import org.apache.asterix.optimizer.rules.RemoveRedundantListifyRule;
 import org.apache.asterix.optimizer.rules.RemoveRedundantSelectRule;
 import org.apache.asterix.optimizer.rules.RemoveSortInFeedIngestionRule;
 import org.apache.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
-import org.apache.asterix.optimizer.rules.ReplaceSinkOpWithCommitOpRule;
+import org.apache.asterix.optimizer.rules.SetupCommitExtensionOpRule;
 import org.apache.asterix.optimizer.rules.ResolveVariableRule;
 import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import org.apache.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
@@ -317,7 +317,7 @@ public final class RuleCollections {
         List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new LinkedList<>();
         physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
         //Turned off the following rule for now not to change OptimizerTest results.
-        physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
+        physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index fee1c96..c8d2760 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -18,9 +18,11 @@
  */
 package org.apache.asterix.optimizer.rules;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
@@ -44,6 +46,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -62,13 +65,35 @@ public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
 
-        // match: [insert to internal dataset with autogenerated id] - assign - project
+        // match: commit OR distribute-result OR SINK - ... followed by:
+        // [insert to internal dataset with autogenerated id] - assign - project
         // produce: insert - assign - assign* - project
         // **
         // OR [insert to internal dataset with autogenerated id] - assign - [datasource scan]
         // produce insert - assign - assign* - datasource scan
 
         AbstractLogicalOperator currentOp = (AbstractLogicalOperator) opRef.getValue();
+        if (currentOp.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator dOp = (DelegateOperator) currentOp;
+            if (!(dOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            } else if (!((CommitOperator) dOp.getDelegate()).isSink()) {
+                return false;
+            }
+
+        } else if (currentOp.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT
+                && currentOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return false;
+        }
+        ArrayDeque<AbstractLogicalOperator> opStack = new ArrayDeque<>();
+        opStack.push(currentOp);
+        while (currentOp.getInputs().size() == 1) {
+            currentOp = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+            if (currentOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                break;
+            }
+            opStack.push(currentOp);
+        }
         if (currentOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
@@ -95,7 +120,8 @@ public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
         AssignOperator assignOp = (AssignOperator) parentOp;
         LogicalVariable inputRecord;
 
-        //bug here. will not work for internal datasets with filters since the pattern becomes [project-assign-assign-insert] <-this should be fixed->
+        //TODO: bug here. will not work for internal datasets with filters since the pattern becomes 
+        //[project-assign-assign-insert]
         AbstractLogicalOperator grandparentOp = (AbstractLogicalOperator) parentOp.getInputs().get(0).getValue();
         if (grandparentOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
             ProjectOperator projectOp = (ProjectOperator) grandparentOp;
@@ -122,7 +148,12 @@ public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
         VariableUtilities.substituteVariables(insertOp, inputRecord, v, context);
         context.computeAndSetTypeEnvironmentForOperator(newAssign);
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(insertOp);
+        context.computeAndSetTypeEnvironmentForOperator(insertOp);;
+        for (AbstractLogicalOperator op : opStack) {
+            VariableUtilities.substituteVariables(op, inputRecord, v, context);
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
index f6cd015..28bcc7f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.rules;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -46,6 +47,8 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -90,11 +93,19 @@ public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
         // We identify INSERT and DISTRIBUTE_RESULT operators.
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         switch (op1.getOperatorTag()) {
-            case SINK: {
+            case SINK:
+            case DELEGATE_OPERATOR: {
                 /**
-                 * pattern match: sink insert assign
-                 * resulting plan: sink-insert-project-assign
+                 * pattern match: commit insert assign
+                 * resulting plan: commit-insert-project-assign
                  */
+                if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator eOp = (DelegateOperator) op1;
+                    if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                        return false;
+                    }
+                }
+
                 AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
                 if (op2.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                     InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op2;
@@ -131,21 +142,8 @@ public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
                 // Remember this is the operator we need to modify
                 op = op1;
 
-                // The Variable we want is the (hopefully singular, hopefully record-typed) live variable
-                // of the singular input operator of the DISTRIBUTE_RESULT
-                if (op.getInputs().size() > 1) {
-                    // Hopefully not possible?
-                    throw new AlgebricksException(
-                            "output-record-type defined for expression with multiple input operators");
-                }
-                AbstractLogicalOperator input = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-                List<LogicalVariable> liveVars = new ArrayList<>();
-                VariableUtilities.getLiveVariables(input, liveVars);
-                if (liveVars.size() > 1) {
-                    throw new AlgebricksException(
-                            "Expression with multiple fields cannot be cast to output-record-type!");
-                }
-                recordVar = liveVars.get(0);
+                recordVar = ((VariableReferenceExpression) ((DistributeResultOperator) op).getExpressions().get(0)
+                        .getValue()).getVariableReference();
                 break;
             }
             default: {
@@ -210,15 +208,15 @@ public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
                 if (var.equals(recordVar)) {
                     /** insert an assign operator to call the function on-top-of the variable */
                     IAType actualType = (IAType) env.getVarType(var);
-                    AbstractFunctionCallExpression cast =
-                            new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fd));
+                    AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(fd));
                     cast.getArguments()
                             .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
                     /** enforce the required record type */
                     TypeCastUtils.setRequiredAndInputTypes(cast, requiredRecordType, actualType);
                     LogicalVariable newAssignVar = context.newVar();
-                    AssignOperator newAssignOperator =
-                            new AssignOperator(newAssignVar, new MutableObject<ILogicalExpression>(cast));
+                    AssignOperator newAssignOperator = new AssignOperator(newAssignVar,
+                            new MutableObject<ILogicalExpression>(cast));
                     newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op));
                     opRef.setValue(newAssignOperator);
                     context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
index a8368a7..f655b24 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
@@ -50,10 +50,10 @@ public class IntroduceRapidFrameFlushProjectAssignRule implements IAlgebraicRewr
     }
 
     private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
-        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        ExtensionOperator extensionOp = (ExtensionOperator) op;
+        DelegateOperator extensionOp = (DelegateOperator) op;
         if (!(extensionOp.getDelegate() instanceof CommitOperator)) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index c487a96..8362dd7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -66,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
@@ -90,20 +92,28 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        AbstractLogicalOperator sinkOp = (AbstractLogicalOperator) opRef.getValue();
-        if (sinkOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
+        if (op0.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op0.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
-        if (sinkOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+        if (op0.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op0;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
         /** find the record variable */
-        InsertDeleteUpsertOperator primaryIndexModificationOp =
-                (InsertDeleteUpsertOperator) sinkOp.getInputs().get(0).getValue();
+        InsertDeleteUpsertOperator primaryIndexModificationOp = (InsertDeleteUpsertOperator) op0.getInputs().get(0)
+                .getValue();
         boolean isBulkload = primaryIndexModificationOp.isBulkload();
         ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue();
-        List<Mutable<ILogicalExpression>> newMetaExprs =
-                primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
+        List<Mutable<ILogicalExpression>> newMetaExprs = primaryIndexModificationOp
+                .getAdditionalNonFilteringExpressions();
         LogicalVariable newRecordVar;
         LogicalVariable newMetaVar = null;
 
@@ -111,9 +121,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
          * inputOp is the assign operator which extracts primary keys from the input
          * variables (record or meta)
          */
-        AbstractLogicalOperator inputOp =
-                (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue();
-
+        AbstractLogicalOperator inputOp = (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0)
+                .getValue();
         newRecordVar = getRecordVar(context, inputOp, newRecordExpr, 0);
         if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
             if (newMetaExprs.size() > 1) {
@@ -168,7 +177,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
         // At this point, we have the data type info, and the indexes info as well
         int secondaryIndexTotalCnt = indexes.size() - 1;
         if (secondaryIndexTotalCnt > 0) {
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
         } else {
             return false;
         }
@@ -226,8 +235,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                      * is solved
                      */
                     || primaryIndexModificationOp.getOperation() == Kind.DELETE) {
-                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType,
-                        metaType, newRecordVar, newMetaVar, primaryIndexModificationOp, false);
+                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType, metaType,
+                        newRecordVar, newMetaVar, primaryIndexModificationOp, false);
                 if (replicateOp != null) {
                     context.computeAndSetTypeEnvironmentForOperator(replicateOp);
                 }
@@ -237,13 +246,12 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
              * https://issues.apache.org/jira/browse/ASTERIXDB-1507
              * is solved
              */) {
-                List<LogicalVariable> beforeOpMetaVars =
-                        primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
+                List<LogicalVariable> beforeOpMetaVars = primaryIndexModificationOp
+                        .getBeforeOpAdditionalNonFilteringVars();
                 LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
-                currentTop =
-                        injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation, recType,
-                                metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
-                                currentTop, true);
+                currentTop = injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation,
+                        recType, metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
+                        currentTop, true);
             }
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
@@ -264,12 +272,11 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             ILogicalOperator replicateOutput;
 
             for (int i = 0; i < secondaryKeyFields.size(); i++) {
-                IndexFieldId indexFieldId =
-                        new IndexFieldId(index.getKeyFieldSourceIndicators().get(i), secondaryKeyFields.get(i));
+                IndexFieldId indexFieldId = new IndexFieldId(index.getKeyFieldSourceIndicators().get(i),
+                        secondaryKeyFields.get(i));
                 LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId);
                 secondaryKeyVars.add(skVar);
-                secondaryExpressions.add(new MutableObject<ILogicalExpression>(
-                        new VariableReferenceExpression(skVar)));
+                secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(skVar)));
                 if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                     beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
                             new VariableReferenceExpression(fieldVarsForBeforeOperation.get(indexFieldId))));
@@ -279,10 +286,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             IndexInsertDeleteUpsertOperator indexUpdate;
             if (index.getIndexType() != IndexType.RTREE) {
                 // Create an expression per key
-                Mutable<ILogicalExpression> filterExpression =
-                        (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
-                                : createFilterExpression(secondaryKeyVars,
-                                        context.getOutputTypeEnvironment(currentTop), index.isEnforcingKeyFileds());
+                Mutable<ILogicalExpression> filterExpression = (primaryIndexModificationOp
+                        .getOperation() == Kind.UPSERT) ? null
+                                : createFilterExpression(secondaryKeyVars, context.getOutputTypeEnvironment(currentTop),
+                                        index.isEnforcingKeyFileds());
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
 
                 // Introduce the TokenizeOperator only when doing bulk-load,
@@ -311,8 +318,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // Check the field type of the secondary key.
                     IAType secondaryKeyType;
                     Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(
-                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0),
-                            recType);
+                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0), recType);
                     secondaryKeyType = keyPairType.first;
 
                     List<Object> varTypes = new ArrayList<>();
@@ -333,8 +339,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
                             primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            tokenizeKeyVars,
-                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            tokenizeKeyVars, filterExpression, primaryIndexModificationOp.getOperation(),
                             primaryIndexModificationOp.isBulkload(), isPartitioned, varTypes);
                     tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                     context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
@@ -350,8 +355,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // When TokenizeOperator is not needed
                     indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                             primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            filterExpression,
-                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            primaryIndexModificationOp.isBulkload(),
                             primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                     : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
@@ -360,8 +365,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                         indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                         if (filteringFields != null) {
-                            indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
-                                    new VariableReferenceExpression(
+                            indexUpdate.setBeforeOpAdditionalFilteringExpression(
+                                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
                                             primaryIndexModificationOp.getBeforeOpFilterVar())));
                         }
                     }
@@ -472,7 +477,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             }
             if (primaryIndexModificationOp.isBulkload()) {
                 // For bulk load, we connect all fanned out insert operator to a single SINK operator
-                sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
+                op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
             }
 
         }
@@ -483,16 +488,15 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
         if (!primaryIndexModificationOp.isBulkload()) {
             // If this is an upsert, we need to
             // Remove the current input to the SINK operator (It is actually already removed above)
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
             // Connect the last index update to the SINK
-            sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+            op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
         }
         return true;
     }
 
     private LogicalVariable getRecordVar(IOptimizationContext context, AbstractLogicalOperator inputOp,
-            ILogicalExpression recordExpr,
-            int expectedRecordIndex) throws AlgebricksException {
+            ILogicalExpression recordExpr, int expectedRecordIndex) throws AlgebricksException {
         if (exprIsRecord(context.getOutputTypeEnvironment(inputOp), recordExpr)) {
             return ((VariableReferenceExpression) recordExpr).getVariableReference();
         } else {
@@ -554,12 +558,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                 ARecordType sourceType = dataset.hasMetaPart()
                         ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType;
                 LogicalVariable sourceVar = dataset.hasMetaPart()
-                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar
-                        : recordVar;
+                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar : recordVar;
                 LogicalVariable fieldVar = context.newVar();
                 // create record variable ref
-                Mutable<ILogicalExpression> varRef =
-                        new MutableObject<>(new VariableReferenceExpression(sourceVar));
+                Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(sourceVar));
                 IAType fieldType = sourceType.getSubFieldType(indexFieldId.fieldName);
                 AbstractFunctionCallExpression theFieldAccessFunc;
                 if (fieldType == null) {
@@ -567,24 +569,22 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // make handling of records with incorrect value type for this field easier and cleaner
                     context.addNotToBeInlinedVar(fieldVar);
                     // create field access
-                    AbstractFunctionCallExpression fieldAccessFunc =
-                            getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName);
+                    AbstractFunctionCallExpression fieldAccessFunc = getOpenOrNestedFieldAccessFunction(varRef,
+                            indexFieldId.fieldName);
                     // create cast
                     theFieldAccessFunc = new ScalarFunctionCallExpression(
                             FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE));
                     // The first argument is the field
-                    theFieldAccessFunc.getArguments()
-                            .add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
-                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i),
-                            BuiltinType.ANY);
+                    theFieldAccessFunc.getArguments().add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
+                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i), BuiltinType.ANY);
                 } else {
                     // Get the desired field position
                     int pos = indexFieldId.fieldName.size() > 1 ? -1
                             : sourceType.getFieldIndex(indexFieldId.fieldName.get(0));
                     // Field not found --> This is either an open field or a nested field. it can't be accessed by index
-                    theFieldAccessFunc =
-                            (pos == -1) ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
-                                    : getClosedFieldAccessFunction(varRef, pos);
+                    theFieldAccessFunc = (pos == -1)
+                            ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
+                            : getClosedFieldAccessFunction(varRef, pos);
                 }
                 vars.add(fieldVar);
                 exprs.add(new MutableObject<ILogicalExpression>(theFieldAccessFunc));
@@ -648,8 +648,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
     }
 
     private static Mutable<ILogicalExpression> constantToMutableLogicalExpression(IAObject constantObject) {
-        return new MutableObject<>(
-                new ConstantExpression(new AsterixConstantValue(constantObject)));
+        return new MutableObject<>(new ConstantExpression(new AsterixConstantValue(constantObject)));
     }
 
     private Mutable<ILogicalExpression> createFilterExpression(List<LogicalVariable> secondaryKeyVars,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
index 89280be..2eaad98 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.rules;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.IAType;
@@ -38,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -87,9 +89,16 @@ public class IntroduceStaticTypeCastForInsertRule implements IAlgebraicRewriteRu
         List<LogicalVariable> producedVariables = new ArrayList<LogicalVariable>();
         LogicalVariable oldRecordVariable;
 
-        if (op1.getOperatorTag() != LogicalOperatorTag.SINK) {
+        if (op1.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op1.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
+        if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op1;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
deleted file mode 100644
index 0c36d0b..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.algebra.operators.CommitOperator;
-import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.lang.common.util.FunctionUtil;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.SINK) {
-            return false;
-        }
-        SinkOperator sinkOperator = (SinkOperator) op;
-
-        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
-        int datasetId = 0;
-        String dataverse = null;
-        String datasetName = null;
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) sinkOperator.getInputs().get(0).getValue();
-        LogicalVariable upsertVar = null;
-        AssignOperator upsertFlagAssign = null;
-        while (descendantOp != null) {
-            if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
-                IndexInsertDeleteUpsertOperator indexInsertDeleteUpsertOperator = (IndexInsertDeleteUpsertOperator) descendantOp;
-                if (!indexInsertDeleteUpsertOperator.isBulkload()
-                        && indexInsertDeleteUpsertOperator.getPrevSecondaryKeyExprs() == null) {
-                    primaryKeyExprs = indexInsertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetId();
-                    dataverse = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDataverseName();
-                    datasetName = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetName();
-                    break;
-                }
-            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
-                InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
-                if (!insertDeleteUpsertOperator.isBulkload()) {
-                    primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetName();
-                    if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
-                        //we need to add a function that checks if previous record was found
-                        upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
-                        // is new value missing? -> this means that the expected operation is delete
-                        AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
-                        isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
-                        AbstractFunctionCallExpression isPrevMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
-                        // argument is the previous record
-                        isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
-
-                        // AssignOperator puts in the cast var the casted record
-                        upsertFlagAssign = new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
-                        // Connect the current top of the plan to the cast operator
-                        upsertFlagAssign.getInputs()
-                                .add(new MutableObject<ILogicalOperator>(sinkOperator.getInputs().get(0).getValue()));
-                        sinkOperator.getInputs().clear();
-                        sinkOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
-                        context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
-                    }
-                    break;
-                }
-            }
-            if (descendantOp.getInputs().size() < 1) {
-                break;
-            }
-            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
-        }
-
-        if (primaryKeyExprs == null) {
-            return false;
-        }
-
-        //copy primaryKeyExprs
-        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<LogicalVariable>();
-        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
-            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr.getValue();
-            primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
-        }
-
-        //get JobId(TransactorId)
-        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
-        JobId jobId = mp.getJobId();
-
-        //create the logical and physical operator
-        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar);
-        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
-                primaryKeyLogicalVars, upsertVar);
-        commitOperator.setPhysicalOperator(commitPOperator);
-
-        //create ExtensionOperator and put the commitOperator in it.
-        ExtensionOperator extensionOperator = new ExtensionOperator(commitOperator);
-        extensionOperator.setPhysicalOperator(commitPOperator);
-
-        //update plan link
-        extensionOperator.getInputs().add(sinkOperator.getInputs().get(0));
-        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
-        opRef.setValue(extensionOperator);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
new file mode 100644
index 0000000..4bbfce0
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.algebra.operators.physical.CommitPOperator;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
+            return false;
+        }
+        DelegateOperator eOp = (DelegateOperator) op;
+        if (!(eOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+        boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
+
+        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
+        int datasetId = 0;
+        String dataverse = null;
+        String datasetName = null;
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+        LogicalVariable upsertVar = null;
+        while (descendantOp != null) {
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
+                IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
+                if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) {
+                    primaryKeyExprs = operator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetName();
+                    break;
+                }
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
+                if (!insertDeleteUpsertOperator.isBulkload()) {
+                    primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetName();
+                    if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
+                        //we need to add a function that checks if previous record was found
+                        upsertVar = context.newVar();
+                        AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
+                        // is new value missing? -> this means that the expected operation is delete
+                        AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
+                        isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
+                        AbstractFunctionCallExpression isPrevMissingFunc = new ScalarFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
+                        // argument is the previous record
+                        isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
+                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
+                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
+                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
+
+                        // AssignOperator puts in the cast var the casted record
+                        AssignOperator upsertFlagAssign = new AssignOperator(upsertVar,
+                                new MutableObject<ILogicalExpression>(orFunc));
+                        // Connect the current top of the plan to the cast operator
+                        upsertFlagAssign.getInputs()
+                                .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+                        eOp.getInputs().clear();
+                        eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
+                        context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
+                    }
+                    break;
+                }
+            }
+            if (descendantOp.getInputs().isEmpty()) {
+                break;
+            }
+            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+        }
+
+        if (primaryKeyExprs == null) {
+            return false;
+        }
+
+        //copy primaryKeyExprs
+        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<>();
+        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
+            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr.getValue();
+            primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
+        }
+
+        //get JobId(TransactorId)
+        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+        JobId jobId = mp.getJobId();
+
+        //create the logical and physical operator
+        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
+                primaryKeyLogicalVars, upsertVar, isSink);
+        commitOperator.setPhysicalOperator(commitPOperator);
+
+        //create ExtensionOperator and put the commitOperator in it.
+        DelegateOperator extensionOperator = new DelegateOperator(commitOperator);
+        extensionOperator.setPhysicalOperator(commitPOperator);
+
+        //update plan link
+        extensionOperator.getInputs().add(eOp.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
+        opRef.setValue(extensionOperator);
+        return true;
+    }
+}