You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by sj...@apache.org on 2016/10/16 18:08:26 UTC

[1/4] asterixdb git commit: Enhanced Insert AQL

Repository: asterixdb
Updated Branches:
  refs/heads/master e9b2adf94 -> afa909a57


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java
new file mode 100644
index 0000000..496cc53
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivityDetails.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.feed.watch;
+
+public class FeedActivityDetails {
+    public static final String INTAKE_LOCATIONS = "intake-locations";
+    public static final String COMPUTE_LOCATIONS = "compute-locations";
+    public static final String STORAGE_LOCATIONS = "storage-locations";
+    public static final String COLLECT_LOCATIONS = "collect-locations";
+    public static final String FEED_POLICY_NAME = "feed-policy-name";
+    public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index d4bf6bc..f2f04d8 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -30,7 +30,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.lang.aql.parser.AQLParserFactory;
 import org.apache.asterix.lang.common.base.IParser;
@@ -72,9 +72,9 @@ public class SubscribeFeedStatement implements Statement {
     public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException {
         this.query = new Query(false);
         EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId();
-        Feed subscriberFeed =
-                MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(),
-                        connectionRequest.getReceivingFeedId().getEntityName());
+        Feed subscriberFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx,
+                connectionRequest.getReceivingFeedId().getDataverse(),
+                connectionRequest.getReceivingFeedId().getEntityName());
         if (subscriberFeed == null) {
             throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found.");
         }
@@ -95,8 +95,8 @@ public class SubscribeFeedStatement implements Statement {
         StringBuilder builder = new StringBuilder();
         builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n");
         builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n");
-        builder.append("set" + " " + FeedActivity.FeedActivityDetails.FEED_POLICY_NAME + " " + "'"
-                + connectionRequest.getPolicy() + "'" + ";\n");
+        builder.append("set" + " " + FeedActivityDetails.FEED_POLICY_NAME + " " + "'" + connectionRequest.getPolicy()
+                + "'" + ";\n");
 
         builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " ");
         builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'"

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
index d17861e..463805b 100644
--- a/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
+++ b/asterixdb/asterix-lang-aql/src/main/javacc/AQL.jj
@@ -926,18 +926,27 @@ boolean IfExists() throws ParseException :
 
 InsertStatement InsertStatement() throws ParseException:
 {
+  VariableExpr var = null;
   Pair<Identifier,Identifier> nameComponents = null;
   Query query;
+  Query returnQuery = null;
   boolean upsert = false;
 }
 {
-  (<INSERT>|<UPSERT>{ upsert = true; }) <INTO> <DATASET> nameComponents = QualifiedName() query = Query()
+  (<INSERT>|<UPSERT>{ upsert = true; }) <INTO> <DATASET> nameComponents = QualifiedName()
+  (<AS> var = Variable())?
+  {
+    if(var != null){
+      getCurrentScope().addNewVarSymbolToScope(var.getVar());
+    }
+  }
+  query = Query() ( <RETURNING> returnQuery = Query())?
     {
       query.setTopLevel(true);
       if(upsert){
-        return new UpsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+        return new UpsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter(), var, returnQuery);
       } else{
-        return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+        return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter(), var, returnQuery);
       }
     }
 }
@@ -2646,6 +2655,7 @@ TOKEN :
   | <PRIMARY : "primary">
   | <REFRESH : "refresh">
   | <RETURN : "return">
+  | <RETURNING : "returning">
   | <RTREE : "rtree">
   | <RUN : "run">
   | <SATISFIES : "satisfies">

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
index b2dc72e..987c3d9 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/InsertStatement.java
@@ -20,6 +20,7 @@ package org.apache.asterix.lang.common.statement;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.commons.lang3.ObjectUtils;
@@ -29,13 +30,18 @@ public class InsertStatement implements Statement {
     private final Identifier dataverseName;
     private final Identifier datasetName;
     private final Query query;
-    private final int varCounter;
+    private int varCounter;
+    private final VariableExpr var;
+    private Query returnQuery;
 
-    public InsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter) {
+    public InsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter,
+            VariableExpr var, Query returnQuery) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.query = query;
         this.varCounter = varCounter;
+        this.var = var;
+        this.returnQuery = returnQuery;
     }
 
     @Override
@@ -55,10 +61,26 @@ public class InsertStatement implements Statement {
         return query;
     }
 
+    public void addToVarCounter(int addition) {
+        varCounter += addition;
+    }
+
     public int getVarCounter() {
         return varCounter;
     }
 
+    public VariableExpr getVar() {
+        return var;
+    }
+
+    public Query getReturnQuery() {
+        return returnQuery;
+    }
+
+    public void setRewrittenReturnQuery(Query rewrittenReturnQuery) {
+        this.returnQuery = rewrittenReturnQuery;
+    }
+
     @Override
     public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException {
         return visitor.visit(this, arg);
@@ -66,7 +88,7 @@ public class InsertStatement implements Statement {
 
     @Override
     public int hashCode() {
-        return ObjectUtils.hashCodeMulti(datasetName, dataverseName, query);
+        return ObjectUtils.hashCodeMulti(datasetName, dataverseName, query, varCounter, var, returnQuery);
     }
 
     @Override
@@ -79,12 +101,17 @@ public class InsertStatement implements Statement {
         }
         InsertStatement target = (InsertStatement) object;
         return ObjectUtils.equals(datasetName, target.datasetName)
-                && ObjectUtils.equals(dataverseName, target.dataverseName) && ObjectUtils.equals(query, target.query);
+                && ObjectUtils.equals(dataverseName, target.dataverseName) && ObjectUtils.equals(query, target.query)
+                && ObjectUtils.equals(varCounter, target.varCounter) && ObjectUtils.equals(var, target.var)
+                && ObjectUtils.equals(returnQuery, target.returnQuery);
     }
 
     @Override
     public byte getCategory() {
-        return Category.UPDATE;
+        if (var == null) {
+            return Category.UPDATE;
+        }
+        return Category.QUERY;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
index 1fb1de2..a82d948 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/UpsertStatement.java
@@ -19,12 +19,14 @@
 package org.apache.asterix.lang.common.statement;
 
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 
 public class UpsertStatement extends InsertStatement {
 
-    public UpsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter) {
-        super(dataverseName, datasetName, query, varCounter);
+    public UpsertStatement(Identifier dataverseName, Identifier datasetName, Query query, int varCounter,
+            VariableExpr var, Query returnQuery) {
+        super(dataverseName, datasetName, query, varCounter, var, returnQuery);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
index bcbdd26..50682c9 100644
--- a/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
+++ b/asterixdb/asterix-lang-sqlpp/src/main/javacc/SQLPP.jj
@@ -969,7 +969,7 @@ InsertStatement InsertStatement() throws ParseException:
   <INSERT> <INTO> nameComponents = QualifiedName() query = Query(false)
     {
       query.setTopLevel(true);
-      return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter());
+      return new InsertStatement(nameComponents.first, nameComponents.second, query, getVarCounter(), null, null);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
index 3b35486..f3592e3 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/ADateTime.java
@@ -20,14 +20,13 @@ package org.apache.asterix.om.base;
 
 import java.io.IOException;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.visitors.IOMVisitor;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * ADateTime type represents the timestamp values.
@@ -40,12 +39,15 @@ import org.apache.asterix.om.visitors.IOMVisitor;
  * - minute; <br/>
  * - second; <br/>
  * - millisecond. <br/>
- * By default, an ADateTime value is a UTC time value, i.e., there is no timezone information maintained. However user can use the timezone based AQL function to convert a UTC time to a timezone-embedded time.
+ * By default, an ADateTime value is a UTC time value, i.e., there is no timezone information maintained. However user
+ * can use the timezone based AQL function to convert a UTC time to a timezone-embedded time.
  * <p/>
  * And the string representation of an ADateTime value follows the ISO8601 standard, in the following format:<br/>
  * [+|-]YYYY-MM-DDThh:mm:ss.xxxZ
  * <p/>
- * Internally, an ADateTime value is stored as the number of milliseconds elapsed since 1970-01-01T00:00:00.000Z (also called chronon time). Functions to convert between a string representation of an ADateTime and its chronon time are implemented in {@link GregorianCalendarSystem}.
+ * Internally, an ADateTime value is stored as the number of milliseconds elapsed since 1970-01-01T00:00:00.000Z (also
+ * called chronon time). Functions to convert between a string representation of an ADateTime and its chronon time are
+ * implemented in {@link GregorianCalendarSystem}.
  * <p/>
  */
 public class ADateTime implements IAObject {
@@ -122,6 +124,13 @@ public class ADateTime implements IAObject {
         return sbder.toString();
     }
 
+    public String toSimpleString() throws IOException {
+        StringBuilder sbder = new StringBuilder();
+        GregorianCalendarSystem.getInstance().getExtendStringRepUntilField(chrononTime, 0, sbder,
+                GregorianCalendarSystem.Fields.YEAR, GregorianCalendarSystem.Fields.MILLISECOND, true);
+        return sbder.toString();
+    }
+
     public long getChrononTime() {
         return chrononTime;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
index 329ae53..025866c 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AUUID.java
@@ -37,13 +37,12 @@ public class AUUID implements IAObject {
     public static final int UUID_CHARS = 36;
     public static final int UUID_BYTES = 16;
 
-    protected final byte [] uuidBytes;
-
-    protected static final char [] CHARS;
+    protected final byte[] uuidBytes;
 
+    private static final char[] CHARS;
 
     static {
-        CHARS = new char [16];
+        CHARS = new char[16];
         for (int i = 0; i < 16; i++) {
             CHARS[i] = Character.forDigit(i, 16);
         }
@@ -53,7 +52,7 @@ public class AUUID implements IAObject {
         this(new byte[UUID_BYTES]);
     }
 
-    public AUUID(byte [] bytes) {
+    public AUUID(byte[] bytes) {
         this.uuidBytes = bytes;
     }
 
@@ -95,11 +94,16 @@ public class AUUID implements IAObject {
         return appendLiteralOnly(buf).append('}').toString();
     }
 
+    public String toSimpleString() {
+        StringBuilder buf = new StringBuilder(UUID_CHARS + 9);
+        return appendLiteralOnly(buf).toString();
+    }
+
     public StringBuilder appendLiteralOnly(StringBuilder buf) {
         return appendLiteralOnly(uuidBytes, 0, buf);
     }
 
-    private static StringBuilder digits(byte b [], int offset, int count, StringBuilder result) {
+    private static StringBuilder digits(byte b[], int offset, int count, StringBuilder result) {
         for (int i = 0; i < count; i++) {
             result.append(CHARS[(b[offset + i] >> 4) & 0xf]);
             result.append(CHARS[b[offset + i] & 0xf]);
@@ -107,7 +111,7 @@ public class AUUID implements IAObject {
         return result;
     }
 
-    public static StringBuilder appendLiteralOnly(byte [] bytes, int offset, StringBuilder result) {
+    public static StringBuilder appendLiteralOnly(byte[] bytes, int offset, StringBuilder result) {
         digits(bytes, offset, 4, result).append('-');
         digits(bytes, offset + 4, 2, result).append('-');
         digits(bytes, offset + 6, 2, result).append('-');

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index cc7a75f..2d13baf 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -26,7 +26,7 @@ public enum LogicalOperatorTag {
     DISTRIBUTE_RESULT,
     EMPTYTUPLESOURCE,
     EXCHANGE,
-    EXTENSION_OPERATOR,
+    DELEGATE_OPERATOR,
     EXTERNAL_LOOKUP,
     GROUP,
     INDEX_INSERT_DELETE_UPSERT,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java
new file mode 100644
index 0000000..9a66e72
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractDelegatedLogicalOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+
+/**
+ * @author rico
+ */
+public abstract class AbstractDelegatedLogicalOperator implements IOperatorDelegate {
+
+    private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
+    protected List<LogicalVariable> schema;
+    protected IPhysicalOperator physicalOperator;
+
+    @Override
+    public ExecutionMode getExecutionMode() {
+        return mode;
+    }
+
+    @Override
+    public void setExecutionMode(ExecutionMode mode) {
+        this.mode = mode;
+    }
+
+    @Override
+    public void setSchema(List<LogicalVariable> schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public IPhysicalOperator getPhysicalOperator() {
+        return physicalOperator;
+    }
+
+    @Override
+    public void setPhysicalOperator(IPhysicalOperator physicalOperator) {
+        this.physicalOperator = physicalOperator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
deleted file mode 100644
index dd555e2..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractExtensibleLogicalOperator.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.List;
-
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-
-/**
- * @author rico
- */
-public abstract class AbstractExtensibleLogicalOperator implements IOperatorExtension {
-
-    private AbstractLogicalOperator.ExecutionMode mode = AbstractLogicalOperator.ExecutionMode.UNPARTITIONED;
-    protected List<LogicalVariable> schema;
-    protected IPhysicalOperator physicalOperator;
-
-    @Override
-    public ExecutionMode getExecutionMode() {
-        return mode;
-    }
-
-    @Override
-    public void setExecutionMode(ExecutionMode mode) {
-        this.mode = mode;
-    }
-
-    @Override
-    public void setSchema(List<LogicalVariable> schema) {
-        this.schema = schema;
-    }
-
-    @Override
-    public IPhysicalOperator getPhysicalOperator() {
-        return physicalOperator;
-    }
-
-    @Override
-    public void setPhysicalOperator(IPhysicalOperator physicalOperator) {
-        this.physicalOperator = physicalOperator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java
new file mode 100644
index 0000000..3667e6b
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/DelegateOperator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * @author rico
+ */
+public class DelegateOperator extends AbstractLogicalOperator {
+
+    private IOperatorDelegate delegate;
+
+    public DelegateOperator(IOperatorDelegate delegate) {
+        super();
+        if (delegate == null) {
+            throw new IllegalArgumentException("delegate cannot be null!");
+        }
+        this.delegate = delegate;
+        setExecutionMode(delegate.getExecutionMode());
+    }
+
+    @Override
+    public void recomputeSchema() throws AlgebricksException {
+        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+        delegate.setSchema(schema);
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return delegate.acceptExpressionTransform(transform);
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitDelegateOperator(this, arg);
+    }
+
+    @Override
+    public boolean isMap() {
+        return this.delegate.isMap();
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return this.createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.DELEGATE_OPERATOR;
+    }
+
+    public IOperatorDelegate getNewInstanceOfDelegateOperator() {
+        return delegate.newInstance();
+    }
+
+    @Override
+    public List<LogicalVariable> getSchema() {
+        return this.schema;
+    }
+
+    @Override
+    public ExecutionMode getExecutionMode() {
+        return delegate.getExecutionMode();
+    }
+
+    @Override
+    public void setExecutionMode(ExecutionMode mode) {
+        delegate.setExecutionMode(mode);
+    }
+
+    @Override
+    public IPhysicalOperator getPhysicalOperator() {
+        return delegate.getPhysicalOperator();
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeInputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return this.createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+    @Override
+    public String toString() {
+        return delegate.toString();
+    }
+
+    public IOperatorDelegate getDelegate() {
+        return delegate;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
deleted file mode 100644
index d2f7715..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ExtensionOperator.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-/**
- * @author rico
- */
-public class ExtensionOperator extends AbstractLogicalOperator {
-
-    private IOperatorExtension delegate;
-
-    public ExtensionOperator(IOperatorExtension delegate) {
-        super();
-        if (delegate == null) {
-            throw new IllegalArgumentException("delegate cannot be null!");
-        }
-        this.delegate = delegate;
-        setExecutionMode(delegate.getExecutionMode());
-    }
-
-    @Override
-    public void recomputeSchema() throws AlgebricksException {
-        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
-        delegate.setSchema(schema);
-    }
-
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
-        return delegate.acceptExpressionTransform(transform);
-    }
-
-    @Override
-    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitExtensionOperator(this, arg);
-    }
-
-    @Override
-    public boolean isMap() {
-        return this.delegate.isMap();
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return this.createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.EXTENSION_OPERATOR;
-    }
-
-    public IOperatorExtension getNewInstanceOfDelegateOperator() {
-        return delegate.newInstance();
-    }
-
-    @Override
-    public List<LogicalVariable> getSchema() {
-        return this.schema;
-    }
-
-    @Override
-    public ExecutionMode getExecutionMode() {
-        return delegate.getExecutionMode();
-    }
-
-    @Override
-    public void setExecutionMode(ExecutionMode mode) {
-        delegate.setExecutionMode(mode);
-    }
-
-    @Override
-    public IPhysicalOperator getPhysicalOperator() {
-        return delegate.getPhysicalOperator();
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeInputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return this.createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-    @Override
-    public String toString() {
-        return delegate.toString();
-    }
-
-    public IOperatorExtension getDelegate() {
-        return delegate;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java
new file mode 100644
index 0000000..a052c3f
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorDelegate.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * @author rico
+ */
+public interface IOperatorDelegate {
+
+    void setExecutionMode(ExecutionMode mode);
+
+    boolean isMap();
+
+    public IOperatorDelegate newInstance();
+
+    boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException;
+
+    void setSchema(List<LogicalVariable> schema);
+
+    IPhysicalOperator getPhysicalOperator();
+
+    void setPhysicalOperator(IPhysicalOperator physicalOperator);
+
+    ExecutionMode getExecutionMode();
+
+    public void getUsedVariables(Collection<LogicalVariable> usedVars);
+
+    public void getProducedVariables(Collection<LogicalVariable> producedVars);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
deleted file mode 100644
index e61d9a2..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IOperatorExtension.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-
-/**
- * @author rico
- */
-public interface IOperatorExtension {
-
-    void setExecutionMode(ExecutionMode mode);
-
-    boolean isMap();
-
-    public IOperatorExtension newInstance();
-
-    boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException;
-
-    void setSchema(List<LogicalVariable> schema);
-
-    IPhysicalOperator getPhysicalOperator();
-
-    void setPhysicalOperator(IPhysicalOperator physicalOperator);
-
-    ExecutionMode getExecutionMode();
-
-    public void getUsedVariables(Collection<LogicalVariable> usedVars);
-
-    public void getProducedVariables(Collection<LogicalVariable> producedVars);
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index d278078..d0aea60 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -35,7 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -164,7 +164,7 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
     }
 
     @Override
-    public Long visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Long visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return UNKNOWN;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index b259869..4843f81 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -53,7 +53,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -764,7 +764,7 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 7f34e8b..b3b9da1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -41,7 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -119,9 +119,9 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
     }
 
     @Override
-    public Boolean visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
-        ExtensionOperator aop = (ExtensionOperator) copyAndSubstituteVar(op, arg);
-        if (aop.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+    public Boolean visitDelegateOperator(DelegateOperator op, ILogicalOperator arg) throws AlgebricksException {
+        DelegateOperator aop = (DelegateOperator) copyAndSubstituteVar(op, arg);
+        if (aop.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return Boolean.FALSE;
         }
         return Boolean.TRUE;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 58b31f8..52d8e64 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -44,7 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -499,7 +499,7 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index f4b3195..0da9110 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -41,7 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -548,7 +548,7 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg)
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 7e92869..bdabbca 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -34,7 +34,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -350,7 +350,7 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 442899f..7543e5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -39,7 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -379,8 +379,8 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
-        return new ExtensionOperator(op.getNewInstanceOfDelegateOperator());
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
+        return new DelegateOperator(op.getNewInstanceOfDelegateOperator());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index 9f1acea..c96276f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -36,7 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -164,7 +164,7 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, IOptimizationContext ctx) throws AlgebricksException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 3645aff..ec96d48 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -40,7 +40,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -283,7 +283,7 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         op.getDelegate().getProducedVariables(producedVariables);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index a746cf2..28f4e5e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -319,7 +319,7 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 7345928..cf24ee7 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -471,7 +471,7 @@ public class SubstituteVariableVisitor
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+    public Void visitDelegateOperator(DelegateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 3daa00f..b8cb4ff 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -443,7 +443,7 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         op.getDelegate().getUsedVariables(usedVariables);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index d3dd166..71ac8f3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -36,7 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -438,7 +438,7 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public Void visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+    public Void visitDelegateOperator(DelegateOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append(op.toString());
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index f5ff8b4..8da41e2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -26,7 +26,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -78,7 +78,7 @@ public interface ILogicalOperatorVisitor<R, T> {
 
     public R visitSelectOperator(SelectOperator op, T arg) throws AlgebricksException;
 
-    public R visitExtensionOperator(ExtensionOperator op, T arg) throws AlgebricksException;
+    public R visitDelegateOperator(DelegateOperator op, T arg) throws AlgebricksException;
 
     public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index c5d7291..080828d 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -37,7 +37,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -164,7 +164,7 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return visit(op);
     }
 


[4/4] asterixdb git commit: Enhanced Insert AQL

Posted by sj...@apache.org.
Enhanced Insert AQL

The optional "as Variable" provides a variable binding for the inserted records
The optional "returning Query" allows users to run simple
queries/functions on the records returned by the insert, and can refer
to the variable bound in "as Variable"

Allow commits to be non-sink operators (contnue job pipeline after commit)

Additionally, this change makes small modifications to
the extension code to prepare for the BAD extension

Also made the OptimizerTests able to work for Extensions

Change-Id: I65789d2a861d15232dd29156a6987d0635ec6c94
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1150
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/afa909a5
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/afa909a5
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/afa909a5

Branch: refs/heads/master
Commit: afa909a5794e1f24da4d619119ca2bc58f486959
Parents: e9b2adf
Author: Steven Glenn Jacobs <sj...@ucr.edu>
Authored: Sun Oct 16 10:35:30 2016 -0700
Committer: Steven Jacobs <sj...@ucr.edu>
Committed: Sun Oct 16 11:07:33 2016 -0700

----------------------------------------------------------------------
 asterixdb/asterix-active/pom.xml                |  18 ++
 .../active/IActiveEntityEventsListener.java     |   2 +-
 .../algebra/extension/IExtensionStatement.java  |   9 +-
 .../algebra/operators/CommitOperator.java       |  39 +++-
 .../operators/physical/CommitPOperator.java     |   6 +-
 .../operators/physical/CommitRuntime.java       |  74 +++---
 .../physical/CommitRuntimeFactory.java          |   8 +-
 .../operators/physical/UpsertCommitRuntime.java |  16 +-
 .../asterix/optimizer/base/RuleCollections.java |   4 +-
 .../rules/IntroduceAutogenerateIDRule.java      |  37 ++-
 .../rules/IntroduceDynamicTypeCastRule.java     |  42 ++--
 ...troduceRapidFrameFlushProjectAssignRule.java |   6 +-
 ...IntroduceSecondaryIndexInsertDeleteRule.java | 109 +++++----
 .../IntroduceStaticTypeCastForInsertRule.java   |  11 +-
 .../rules/ReplaceSinkOpWithCommitOpRule.java    | 166 -------------
 .../rules/SetupCommitExtensionOpRule.java       | 168 +++++++++++++
 .../SweepIllegalNonfunctionalFunctions.java     |   4 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |   2 +-
 .../am/AbstractIntroduceAccessMethodRule.java   | 132 +++++------
 .../subplan/InlineAllNtsInSubplanVisitor.java   |   4 +-
 ...neLeftNtsInSubplanJoinFlatteningVisitor.java |   4 +-
 .../SubplanSpecialFlatteningCheckVisitor.java   |   4 +-
 .../asterix/translator/CompiledStatements.java  |  20 +-
 .../asterix/translator/IStatementExecutor.java  |   4 +-
 .../LangExpressionToPlanTranslator.java         | 198 +++++++++++-----
 .../asterix/api/http/servlet/FeedServlet.java   |  52 -----
 .../asterix/app/translator/QueryTranslator.java | 234 ++++++++++---------
 .../app/bootstrap/TestNodeController.java       |   2 +-
 .../asterix/test/optimizer/OptimizerTest.java   |  15 +-
 .../asterix/test/runtime/ExecutionTestUtil.java |  11 +-
 .../queries/insert-return-custom-result.aql     |  48 ++++
 .../results/insert-return-custom-result.plan    |  17 ++
 .../insert-return-records.1.ddl.aql             |  36 +++
 .../insert-return-records.3.query.aql           |  34 +++
 .../insert-returning-fieldname.1.ddl.aql        |  37 +++
 .../insert-returning-fieldname.3.query.aql      |  30 +++
 .../insert-with-bad-return.1.ddl.aql            |  36 +++
 .../insert-with-bad-return.3.query.aql          |  37 +++
 .../upsert-return-custom-result.1.ddl.aql       |  37 +++
 .../upsert-return-custom-result.3.query.aql     |  43 ++++
 .../insert-return-records.1.adm                 |   5 +
 .../insert-returning-fieldname.1.adm            |   1 +
 .../upsert-return-custom-result.1.adm           |   5 +
 .../src/test/resources/runtimets/testsuite.xml  |  21 ++
 .../asterix-doc/src/site/markdown/aql/manual.md |  10 +-
 .../api/IActiveLifecycleEventSubscriber.java    |  40 ++++
 .../feed/api/IFeedLifecycleEventSubscriber.java |  37 ---
 .../ActiveLifecycleEventSubscriber.java         |  67 ++++++
 .../feed/management/FeedEventsListener.java     |  32 +--
 .../FeedLifecycleEventSubscriber.java           |  66 ------
 .../external/feed/watch/FeedActivity.java       | 116 ---------
 .../feed/watch/FeedActivityDetails.java         |  27 +++
 .../aql/statement/SubscribeFeedStatement.java   |  12 +-
 .../asterix-lang-aql/src/main/javacc/AQL.jj     |  16 +-
 .../lang/common/statement/InsertStatement.java  |  37 ++-
 .../lang/common/statement/UpsertStatement.java  |   6 +-
 .../asterix-lang-sqlpp/src/main/javacc/SQLPP.jj |   2 +-
 .../org/apache/asterix/om/base/ADateTime.java   |  19 +-
 .../java/org/apache/asterix/om/base/AUUID.java  |  18 +-
 .../core/algebra/base/LogicalOperatorTag.java   |   2 +-
 .../AbstractDelegatedLogicalOperator.java       |  60 +++++
 .../AbstractExtensibleLogicalOperator.java      |  60 -----
 .../operators/logical/DelegateOperator.java     | 125 ++++++++++
 .../operators/logical/ExtensionOperator.java    | 124 ----------
 .../operators/logical/IOperatorDelegate.java    |  54 +++++
 .../operators/logical/IOperatorExtension.java   |  54 -----
 .../visitors/CardinalityInferenceVisitor.java   |   4 +-
 .../visitors/FDsAndEquivClassesVisitor.java     |   4 +-
 .../visitors/IsomorphismOperatorVisitor.java    |   8 +-
 .../IsomorphismVariableMappingVisitor.java      |   4 +-
 ...OperatorDeepCopyWithNewVariablesVisitor.java |   4 +-
 .../visitors/LogicalPropertiesVisitor.java      |   4 +-
 .../visitors/OperatorDeepCopyVisitor.java       |   6 +-
 .../visitors/PrimaryKeyVariablesVisitor.java    |   4 +-
 .../visitors/ProducedVariableVisitor.java       |   4 +-
 .../logical/visitors/SchemaVariableVisitor.java |   4 +-
 .../visitors/SubstituteVariableVisitor.java     |   4 +-
 .../logical/visitors/UsedVariableVisitor.java   |   4 +-
 .../LogicalOperatorPrettyPrintVisitor.java      |   4 +-
 .../visitors/ILogicalOperatorVisitor.java       |   4 +-
 ...placeNtsWithSubplanInputOperatorVisitor.java |   4 +-
 81 files changed, 1680 insertions(+), 1158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-active/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 9f4e650..efba47e 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -1,3 +1,21 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.    See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 2dd9fe7..d0fb5e8 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,6 +31,6 @@ public interface IActiveEntityEventsListener {
 
     public EntityId getEntityId();
 
-    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName);
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName);
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
index e88962a..d15ae6f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/IExtensionStatement.java
@@ -21,8 +21,11 @@ package org.apache.asterix.algebra.extension;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
@@ -36,15 +39,17 @@ public interface IExtensionStatement extends Statement {
     }
 
     /**
-     * Called when the {@code IQueryTranslator} encounters an extension statement.
+     * Called when the {@code IStatementExecutor} encounters an extension statement.
      * An implementation class should implement the actual processing of the statement in this method.
      *
      * @param queryTranslator
      * @param metadataProvider
      * @param statementExecutor
      * @param hcc
+     * @param resultSetIdCounter
      * @throws Exception
      */
     void handle(IStatementExecutor statementExecutor, AqlMetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws HyracksDataException, AlgebricksException;
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
+            int resultSetIdCounter) throws HyracksDataException, AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
index 8dd6d33..1ee130c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/CommitOperator.java
@@ -19,23 +19,32 @@
 
 package org.apache.asterix.algebra.operators;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractExtensibleLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorExtension;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 
-public class CommitOperator extends AbstractExtensibleLogicalOperator {
+public class CommitOperator extends AbstractDelegatedLogicalOperator {
 
-    private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final LogicalVariable upsertVar;
+    private List<LogicalVariable> primaryKeyLogicalVars;
+    private LogicalVariable upsertVar;
+    private boolean isSink;
 
-    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+    public CommitOperator(boolean isSink) {
+        this.isSink = isSink;
+        this.upsertVar = null;
+        primaryKeyLogicalVars = new ArrayList<>();
+    }
+
+    public CommitOperator(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
+        this.isSink = isSink;
     }
 
     @Override
@@ -44,9 +53,23 @@ public class CommitOperator extends AbstractExtensibleLogicalOperator {
         return false;
     }
 
+    public boolean isSink() {
+        return isSink;
+    }
+
+    public void setSink(boolean isSink) {
+        this.isSink = isSink;
+    }
+
+    //Provided for Extensions but not used by core
+    public void setVars(List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+        this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+        this.upsertVar = upsertVar;
+    }
+
     @Override
-    public IOperatorExtension newInstance() {
-        return new CommitOperator(primaryKeyLogicalVars, upsertVar);
+    public IOperatorDelegate newInstance() {
+        return new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1a26021..5a1c929 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -47,15 +47,17 @@ public class CommitPOperator extends AbstractPhysicalOperator {
     private final String dataverse;
     private final String dataset;
     private final LogicalVariable upsertVar;
+    private final boolean isSink;
 
     public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
-            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar) {
+            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
         this.dataverse = dataverse;
         this.dataset = dataset;
+        this.isSink = isSink;
     }
 
     @Override
@@ -105,7 +107,7 @@ public class CommitPOperator extends AbstractPhysicalOperator {
         }
         runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
                 metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions);
+                datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index c6c71f6..a1fa788 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -31,8 +31,7 @@ import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
-import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -45,7 +44,7 @@ import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
-public class CommitRuntime implements IPushRuntime {
+public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
     private final static long SEED = 0L;
 
@@ -57,27 +56,27 @@ public class CommitRuntime implements IPushRuntime {
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final FrameTupleReference frameTupleReference;
     protected final IHyracksTaskContext ctx;
     protected final int resourcePartition;
     protected ITransactionContext transactionContext;
     protected LogRecord logRecord;
-    protected FrameTupleAccessor frameTupleAccessor;
+    protected final boolean isSink;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) {
         this.ctx = ctx;
-        IAsterixAppRuntimeContext runtimeCtx =
-                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
+        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
+                .getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
-        this.frameTupleReference = new FrameTupleReference();
+        this.tRef = new FrameTupleReference();
         this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
+        this.isSink = isSink;
         longHashes = new long[2];
     }
 
@@ -86,9 +85,14 @@ public class CommitRuntime implements IPushRuntime {
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK,
+                    ctx);
             logRecord = new LogRecord(callback);
+            if (isSink) {
+                return;
+            }
+            initAccessAppend(ctx);
+            writer.open();
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -96,26 +100,27 @@ public class CommitRuntime implements IPushRuntime {
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        frameTupleAccessor.reset(buffer);
-        int nTuple = frameTupleAccessor.getTupleCount();
+        tAccess.reset(buffer);
+        int nTuple = tAccess.getTupleCount();
         for (int t = 0; t < nTuple; t++) {
             if (isTemporaryDatasetWriteJob) {
                 /**
-                 * This "if branch" is for writes over temporary datasets.
-                 * A temporary dataset does not require any lock and does not generate any write-ahead
-                 * update and commit log but generates flush log and job commit log.
-                 * However, a temporary dataset still MUST guarantee no-steal policy so that this
-                 * notification call should be delivered to PrimaryIndexOptracker and used correctly in order
-                 * to decrement number of active operation count of PrimaryIndexOptracker.
-                 * By maintaining the count correctly and only allowing flushing when the count is 0, it can
-                 * guarantee the no-steal policy for temporary datasets, too.
+                 * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock
+                 * and does not generate any write-ahead update and commit log but generates flush log and job commit
+                 * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call
+                 * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of
+                 * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
+                 * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
                  */
                 transactionContext.notifyOptracker(false);
             } else {
-                frameTupleReference.reset(frameTupleAccessor, t);
+                tRef.reset(tAccess, t);
                 try {
                     formLogRecord(buffer, t);
                     logMgr.log(logRecord);
+                    if (!isSink) {
+                        appendTupleToFrame(t);
+                    }
                 } catch (ACIDException e) {
                     throw new HyracksDataException(e);
                 }
@@ -141,8 +146,8 @@ public class CommitRuntime implements IPushRuntime {
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, frameTupleReference,
+        int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+        TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
                 primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
     }
 
@@ -153,22 +158,27 @@ public class CommitRuntime implements IPushRuntime {
 
     @Override
     public void fail() throws HyracksDataException {
-
+        failed = true;
+        if (isSink) {
+            return;
+        }
+        writer.fail();
     }
 
     @Override
     public void close() throws HyracksDataException {
-
-    }
-
-    @Override
-    public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-        throw new IllegalStateException();
+        if (isSink) {
+            return;
+        }
+        flushIfNotFailed();
+        writer.close();
+        appender.reset(frame, true);
     }
 
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
-        this.frameTupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.inputRecordDesc = recordDescriptor;
+        this.tAccess = new FrameTupleAccessor(inputRecordDesc);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index 4f28b9d..9486a19 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -36,9 +36,10 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
     private final boolean isWriteTransaction;
     private final int upsertVarIdx;
     private int[] datasetPartitions;
+    private final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
-            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions) {
+            boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
         this.jobId = jobId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
@@ -46,6 +47,7 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
         this.isWriteTransaction = isWriteTransaction;
         this.upsertVarIdx = upsertVarIdx;
         this.datasetPartitions = datasetPartitions;
+        this.isSink = isSink;
     }
 
     @Override
@@ -58,10 +60,10 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
         if (upsertVarIdx >= 0) {
             return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
                     isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
-                    upsertVarIdx);
+                    upsertVarIdx, isSink);
         } else {
             return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob,
-                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()]);
+                    isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
index 7358700..53e0f62 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
@@ -30,25 +30,25 @@ public class UpsertCommitRuntime extends CommitRuntime {
     private final int upsertIdx;
 
     public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
-            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx) {
+            boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx,
+            boolean isSink) {
         super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction,
-                resourcePartition);
+                resourcePartition, isSink);
         this.upsertIdx = upsertIdx;
     }
 
     @Override
     protected void formLogRecord(ByteBuffer buffer, int t) {
-        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(),
-                frameTupleAccessor.getFieldSlotsLength() + frameTupleAccessor.getTupleStartOffset(t)
-                        + frameTupleAccessor.getFieldStartOffset(t, upsertIdx) + 1);
+        boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength()
+                + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1);
         if (isNull) {
             // Previous record not found (insert)
             super.formLogRecord(buffer, t);
         } else {
             // Previous record found (delete + insert)
-            int pkHash = computePrimaryKeyHashValue(frameTupleReference, primaryKeyFields);
-            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash,
-                    frameTupleReference, primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
+            int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+            TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
+                    primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index cd8d747..56a3bd0 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -66,7 +66,7 @@ import org.apache.asterix.optimizer.rules.RemoveRedundantListifyRule;
 import org.apache.asterix.optimizer.rules.RemoveRedundantSelectRule;
 import org.apache.asterix.optimizer.rules.RemoveSortInFeedIngestionRule;
 import org.apache.asterix.optimizer.rules.RemoveUnusedOneToOneEquiJoinRule;
-import org.apache.asterix.optimizer.rules.ReplaceSinkOpWithCommitOpRule;
+import org.apache.asterix.optimizer.rules.SetupCommitExtensionOpRule;
 import org.apache.asterix.optimizer.rules.ResolveVariableRule;
 import org.apache.asterix.optimizer.rules.SetAsterixPhysicalOperatorsRule;
 import org.apache.asterix.optimizer.rules.SetClosedRecordConstructorsRule;
@@ -317,7 +317,7 @@ public final class RuleCollections {
         List<IAlgebraicRewriteRule> physicalRewritesAllLevels = new LinkedList<>();
         physicalRewritesAllLevels.add(new PullSelectOutOfEqJoin());
         //Turned off the following rule for now not to change OptimizerTest results.
-        physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
+        physicalRewritesAllLevels.add(new SetupCommitExtensionOpRule());
         physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
         physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index fee1c96..c8d2760 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -18,9 +18,11 @@
  */
 package org.apache.asterix.optimizer.rules;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
@@ -44,6 +46,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
@@ -62,13 +65,35 @@ public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
 
-        // match: [insert to internal dataset with autogenerated id] - assign - project
+        // match: commit OR distribute-result OR SINK - ... followed by:
+        // [insert to internal dataset with autogenerated id] - assign - project
         // produce: insert - assign - assign* - project
         // **
         // OR [insert to internal dataset with autogenerated id] - assign - [datasource scan]
         // produce insert - assign - assign* - datasource scan
 
         AbstractLogicalOperator currentOp = (AbstractLogicalOperator) opRef.getValue();
+        if (currentOp.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator dOp = (DelegateOperator) currentOp;
+            if (!(dOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            } else if (!((CommitOperator) dOp.getDelegate()).isSink()) {
+                return false;
+            }
+
+        } else if (currentOp.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT
+                && currentOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+            return false;
+        }
+        ArrayDeque<AbstractLogicalOperator> opStack = new ArrayDeque<>();
+        opStack.push(currentOp);
+        while (currentOp.getInputs().size() == 1) {
+            currentOp = (AbstractLogicalOperator) currentOp.getInputs().get(0).getValue();
+            if (currentOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                break;
+            }
+            opStack.push(currentOp);
+        }
         if (currentOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
@@ -95,7 +120,8 @@ public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
         AssignOperator assignOp = (AssignOperator) parentOp;
         LogicalVariable inputRecord;
 
-        //bug here. will not work for internal datasets with filters since the pattern becomes [project-assign-assign-insert] <-this should be fixed->
+        //TODO: bug here. will not work for internal datasets with filters since the pattern becomes 
+        //[project-assign-assign-insert]
         AbstractLogicalOperator grandparentOp = (AbstractLogicalOperator) parentOp.getInputs().get(0).getValue();
         if (grandparentOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
             ProjectOperator projectOp = (ProjectOperator) grandparentOp;
@@ -122,7 +148,12 @@ public class IntroduceAutogenerateIDRule implements IAlgebraicRewriteRule {
         VariableUtilities.substituteVariables(insertOp, inputRecord, v, context);
         context.computeAndSetTypeEnvironmentForOperator(newAssign);
         context.computeAndSetTypeEnvironmentForOperator(assignOp);
-        context.computeAndSetTypeEnvironmentForOperator(insertOp);
+        context.computeAndSetTypeEnvironmentForOperator(insertOp);;
+        for (AbstractLogicalOperator op : opStack) {
+            VariableUtilities.substituteVariables(op, inputRecord, v, context);
+            context.computeAndSetTypeEnvironmentForOperator(op);
+        }
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
index f6cd015..28bcc7f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceDynamicTypeCastRule.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.rules;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
@@ -46,6 +47,8 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceE
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -90,11 +93,19 @@ public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
         // We identify INSERT and DISTRIBUTE_RESULT operators.
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         switch (op1.getOperatorTag()) {
-            case SINK: {
+            case SINK:
+            case DELEGATE_OPERATOR: {
                 /**
-                 * pattern match: sink insert assign
-                 * resulting plan: sink-insert-project-assign
+                 * pattern match: commit insert assign
+                 * resulting plan: commit-insert-project-assign
                  */
+                if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+                    DelegateOperator eOp = (DelegateOperator) op1;
+                    if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                        return false;
+                    }
+                }
+
                 AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
                 if (op2.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                     InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op2;
@@ -131,21 +142,8 @@ public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
                 // Remember this is the operator we need to modify
                 op = op1;
 
-                // The Variable we want is the (hopefully singular, hopefully record-typed) live variable
-                // of the singular input operator of the DISTRIBUTE_RESULT
-                if (op.getInputs().size() > 1) {
-                    // Hopefully not possible?
-                    throw new AlgebricksException(
-                            "output-record-type defined for expression with multiple input operators");
-                }
-                AbstractLogicalOperator input = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-                List<LogicalVariable> liveVars = new ArrayList<>();
-                VariableUtilities.getLiveVariables(input, liveVars);
-                if (liveVars.size() > 1) {
-                    throw new AlgebricksException(
-                            "Expression with multiple fields cannot be cast to output-record-type!");
-                }
-                recordVar = liveVars.get(0);
+                recordVar = ((VariableReferenceExpression) ((DistributeResultOperator) op).getExpressions().get(0)
+                        .getValue()).getVariableReference();
                 break;
             }
             default: {
@@ -210,15 +208,15 @@ public class IntroduceDynamicTypeCastRule implements IAlgebraicRewriteRule {
                 if (var.equals(recordVar)) {
                     /** insert an assign operator to call the function on-top-of the variable */
                     IAType actualType = (IAType) env.getVarType(var);
-                    AbstractFunctionCallExpression cast =
-                            new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(fd));
+                    AbstractFunctionCallExpression cast = new ScalarFunctionCallExpression(
+                            FunctionUtil.getFunctionInfo(fd));
                     cast.getArguments()
                             .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(var)));
                     /** enforce the required record type */
                     TypeCastUtils.setRequiredAndInputTypes(cast, requiredRecordType, actualType);
                     LogicalVariable newAssignVar = context.newVar();
-                    AssignOperator newAssignOperator =
-                            new AssignOperator(newAssignVar, new MutableObject<ILogicalExpression>(cast));
+                    AssignOperator newAssignOperator = new AssignOperator(newAssignVar,
+                            new MutableObject<ILogicalExpression>(cast));
                     newAssignOperator.getInputs().add(new MutableObject<ILogicalOperator>(op));
                     opRef.setValue(newAssignOperator);
                     context.computeAndSetTypeEnvironmentForOperator(newAssignOperator);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
index a8368a7..f655b24 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRapidFrameFlushProjectAssignRule.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
@@ -50,10 +50,10 @@ public class IntroduceRapidFrameFlushProjectAssignRule implements IAlgebraicRewr
     }
 
     private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
-        if (op.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR) {
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
             return false;
         }
-        ExtensionOperator extensionOp = (ExtensionOperator) op;
+        DelegateOperator extensionOp = (DelegateOperator) op;
         if (!(extensionOp.getDelegate() instanceof CommitOperator)) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index c487a96..8362dd7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -66,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
@@ -90,20 +92,28 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
             throws AlgebricksException {
-        AbstractLogicalOperator sinkOp = (AbstractLogicalOperator) opRef.getValue();
-        if (sinkOp.getOperatorTag() != LogicalOperatorTag.SINK) {
+        AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
+        if (op0.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op0.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
-        if (sinkOp.getInputs().get(0).getValue().getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+        if (op0.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op0;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
+        if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;
         }
         /** find the record variable */
-        InsertDeleteUpsertOperator primaryIndexModificationOp =
-                (InsertDeleteUpsertOperator) sinkOp.getInputs().get(0).getValue();
+        InsertDeleteUpsertOperator primaryIndexModificationOp = (InsertDeleteUpsertOperator) op0.getInputs().get(0)
+                .getValue();
         boolean isBulkload = primaryIndexModificationOp.isBulkload();
         ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue();
-        List<Mutable<ILogicalExpression>> newMetaExprs =
-                primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
+        List<Mutable<ILogicalExpression>> newMetaExprs = primaryIndexModificationOp
+                .getAdditionalNonFilteringExpressions();
         LogicalVariable newRecordVar;
         LogicalVariable newMetaVar = null;
 
@@ -111,9 +121,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
          * inputOp is the assign operator which extracts primary keys from the input
          * variables (record or meta)
          */
-        AbstractLogicalOperator inputOp =
-                (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue();
-
+        AbstractLogicalOperator inputOp = (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0)
+                .getValue();
         newRecordVar = getRecordVar(context, inputOp, newRecordExpr, 0);
         if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
             if (newMetaExprs.size() > 1) {
@@ -168,7 +177,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
         // At this point, we have the data type info, and the indexes info as well
         int secondaryIndexTotalCnt = indexes.size() - 1;
         if (secondaryIndexTotalCnt > 0) {
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
         } else {
             return false;
         }
@@ -226,8 +235,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                      * is solved
                      */
                     || primaryIndexModificationOp.getOperation() == Kind.DELETE) {
-                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType,
-                        metaType, newRecordVar, newMetaVar, primaryIndexModificationOp, false);
+                injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForNewRecord, recType, metaType,
+                        newRecordVar, newMetaVar, primaryIndexModificationOp, false);
                 if (replicateOp != null) {
                     context.computeAndSetTypeEnvironmentForOperator(replicateOp);
                 }
@@ -237,13 +246,12 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
              * https://issues.apache.org/jira/browse/ASTERIXDB-1507
              * is solved
              */) {
-                List<LogicalVariable> beforeOpMetaVars =
-                        primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
+                List<LogicalVariable> beforeOpMetaVars = primaryIndexModificationOp
+                        .getBeforeOpAdditionalNonFilteringVars();
                 LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
-                currentTop =
-                        injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation, recType,
-                                metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
-                                currentTop, true);
+                currentTop = injectFieldAccessesForIndexes(context, dataset, indexes, fieldVarsForBeforeOperation,
+                        recType, metaType, primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar,
+                        currentTop, true);
             }
         } catch (AsterixException e) {
             throw new AlgebricksException(e);
@@ -264,12 +272,11 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             ILogicalOperator replicateOutput;
 
             for (int i = 0; i < secondaryKeyFields.size(); i++) {
-                IndexFieldId indexFieldId =
-                        new IndexFieldId(index.getKeyFieldSourceIndicators().get(i), secondaryKeyFields.get(i));
+                IndexFieldId indexFieldId = new IndexFieldId(index.getKeyFieldSourceIndicators().get(i),
+                        secondaryKeyFields.get(i));
                 LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId);
                 secondaryKeyVars.add(skVar);
-                secondaryExpressions.add(new MutableObject<ILogicalExpression>(
-                        new VariableReferenceExpression(skVar)));
+                secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(skVar)));
                 if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                     beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(
                             new VariableReferenceExpression(fieldVarsForBeforeOperation.get(indexFieldId))));
@@ -279,10 +286,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             IndexInsertDeleteUpsertOperator indexUpdate;
             if (index.getIndexType() != IndexType.RTREE) {
                 // Create an expression per key
-                Mutable<ILogicalExpression> filterExpression =
-                        (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
-                                : createFilterExpression(secondaryKeyVars,
-                                        context.getOutputTypeEnvironment(currentTop), index.isEnforcingKeyFileds());
+                Mutable<ILogicalExpression> filterExpression = (primaryIndexModificationOp
+                        .getOperation() == Kind.UPSERT) ? null
+                                : createFilterExpression(secondaryKeyVars, context.getOutputTypeEnvironment(currentTop),
+                                        index.isEnforcingKeyFileds());
                 AqlIndex dataSourceIndex = new AqlIndex(index, dataverseName, datasetName, mp);
 
                 // Introduce the TokenizeOperator only when doing bulk-load,
@@ -311,8 +318,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // Check the field type of the secondary key.
                     IAType secondaryKeyType;
                     Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(
-                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0),
-                            recType);
+                            index.getKeyFieldTypes().get(0), secondaryKeyFields.get(0), recType);
                     secondaryKeyType = keyPairType.first;
 
                     List<Object> varTypes = new ArrayList<>();
@@ -333,8 +339,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // TokenizeOperator to tokenize [SK, PK] pairs
                     TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
                             primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            tokenizeKeyVars,
-                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            tokenizeKeyVars, filterExpression, primaryIndexModificationOp.getOperation(),
                             primaryIndexModificationOp.isBulkload(), isPartitioned, varTypes);
                     tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                     context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
@@ -350,8 +355,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // When TokenizeOperator is not needed
                     indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                             primaryIndexModificationOp.getPrimaryKeyExpressions(), secondaryExpressions,
-                            filterExpression,
-                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
+                            filterExpression, primaryIndexModificationOp.getOperation(),
+                            primaryIndexModificationOp.isBulkload(),
                             primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                     : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                     indexUpdate.setAdditionalFilteringExpressions(filteringExpressions);
@@ -360,8 +365,8 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                         indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                         if (filteringFields != null) {
-                            indexUpdate.setBeforeOpAdditionalFilteringExpression(new MutableObject<ILogicalExpression>(
-                                    new VariableReferenceExpression(
+                            indexUpdate.setBeforeOpAdditionalFilteringExpression(
+                                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
                                             primaryIndexModificationOp.getBeforeOpFilterVar())));
                         }
                     }
@@ -472,7 +477,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
             }
             if (primaryIndexModificationOp.isBulkload()) {
                 // For bulk load, we connect all fanned out insert operator to a single SINK operator
-                sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
+                op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
             }
 
         }
@@ -483,16 +488,15 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
         if (!primaryIndexModificationOp.isBulkload()) {
             // If this is an upsert, we need to
             // Remove the current input to the SINK operator (It is actually already removed above)
-            sinkOp.getInputs().clear();
+            op0.getInputs().clear();
             // Connect the last index update to the SINK
-            sinkOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
+            op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
         }
         return true;
     }
 
     private LogicalVariable getRecordVar(IOptimizationContext context, AbstractLogicalOperator inputOp,
-            ILogicalExpression recordExpr,
-            int expectedRecordIndex) throws AlgebricksException {
+            ILogicalExpression recordExpr, int expectedRecordIndex) throws AlgebricksException {
         if (exprIsRecord(context.getOutputTypeEnvironment(inputOp), recordExpr)) {
             return ((VariableReferenceExpression) recordExpr).getVariableReference();
         } else {
@@ -554,12 +558,10 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                 ARecordType sourceType = dataset.hasMetaPart()
                         ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType;
                 LogicalVariable sourceVar = dataset.hasMetaPart()
-                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar
-                        : recordVar;
+                        ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordVar : metaVar : recordVar;
                 LogicalVariable fieldVar = context.newVar();
                 // create record variable ref
-                Mutable<ILogicalExpression> varRef =
-                        new MutableObject<>(new VariableReferenceExpression(sourceVar));
+                Mutable<ILogicalExpression> varRef = new MutableObject<>(new VariableReferenceExpression(sourceVar));
                 IAType fieldType = sourceType.getSubFieldType(indexFieldId.fieldName);
                 AbstractFunctionCallExpression theFieldAccessFunc;
                 if (fieldType == null) {
@@ -567,24 +569,22 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
                     // make handling of records with incorrect value type for this field easier and cleaner
                     context.addNotToBeInlinedVar(fieldVar);
                     // create field access
-                    AbstractFunctionCallExpression fieldAccessFunc =
-                            getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName);
+                    AbstractFunctionCallExpression fieldAccessFunc = getOpenOrNestedFieldAccessFunction(varRef,
+                            indexFieldId.fieldName);
                     // create cast
                     theFieldAccessFunc = new ScalarFunctionCallExpression(
                             FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CAST_TYPE));
                     // The first argument is the field
-                    theFieldAccessFunc.getArguments()
-                            .add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
-                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i),
-                            BuiltinType.ANY);
+                    theFieldAccessFunc.getArguments().add(new MutableObject<ILogicalExpression>(fieldAccessFunc));
+                    TypeCastUtils.setRequiredAndInputTypes(theFieldAccessFunc, skTypes.get(i), BuiltinType.ANY);
                 } else {
                     // Get the desired field position
                     int pos = indexFieldId.fieldName.size() > 1 ? -1
                             : sourceType.getFieldIndex(indexFieldId.fieldName.get(0));
                     // Field not found --> This is either an open field or a nested field. it can't be accessed by index
-                    theFieldAccessFunc =
-                            (pos == -1) ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
-                                    : getClosedFieldAccessFunction(varRef, pos);
+                    theFieldAccessFunc = (pos == -1)
+                            ? getOpenOrNestedFieldAccessFunction(varRef, indexFieldId.fieldName)
+                            : getClosedFieldAccessFunction(varRef, pos);
                 }
                 vars.add(fieldVar);
                 exprs.add(new MutableObject<ILogicalExpression>(theFieldAccessFunc));
@@ -648,8 +648,7 @@ public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewrit
     }
 
     private static Mutable<ILogicalExpression> constantToMutableLogicalExpression(IAObject constantObject) {
-        return new MutableObject<>(
-                new ConstantExpression(new AsterixConstantValue(constantObject)));
+        return new MutableObject<>(new ConstantExpression(new AsterixConstantValue(constantObject)));
     }
 
     private Mutable<ILogicalExpression> createFilterExpression(List<LogicalVariable> secondaryKeyVars,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
index 89280be..2eaad98 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceStaticTypeCastForInsertRule.java
@@ -22,6 +22,7 @@ package org.apache.asterix.optimizer.rules;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.metadata.declared.AqlDataSource;
 import org.apache.asterix.om.typecomputer.base.TypeCastUtils;
 import org.apache.asterix.om.types.IAType;
@@ -38,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCa
 import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -87,9 +89,16 @@ public class IntroduceStaticTypeCastForInsertRule implements IAlgebraicRewriteRu
         List<LogicalVariable> producedVariables = new ArrayList<LogicalVariable>();
         LogicalVariable oldRecordVariable;
 
-        if (op1.getOperatorTag() != LogicalOperatorTag.SINK) {
+        if (op1.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
+                && op1.getOperatorTag() != LogicalOperatorTag.SINK) {
             return false;
         }
+        if (op1.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
+            DelegateOperator eOp = (DelegateOperator) op1;
+            if (!(eOp.getDelegate() instanceof CommitOperator)) {
+                return false;
+            }
+        }
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
         if (op2.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
             return false;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
deleted file mode 100644
index 0c36d0b..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.algebra.operators.CommitOperator;
-import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.lang.common.util.FunctionUtil;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class ReplaceSinkOpWithCommitOpRule implements IAlgebraicRewriteRule {
-
-    @Override
-    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
-            throws AlgebricksException {
-
-        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-        if (op.getOperatorTag() != LogicalOperatorTag.SINK) {
-            return false;
-        }
-        SinkOperator sinkOperator = (SinkOperator) op;
-
-        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
-        int datasetId = 0;
-        String dataverse = null;
-        String datasetName = null;
-        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) sinkOperator.getInputs().get(0).getValue();
-        LogicalVariable upsertVar = null;
-        AssignOperator upsertFlagAssign = null;
-        while (descendantOp != null) {
-            if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
-                IndexInsertDeleteUpsertOperator indexInsertDeleteUpsertOperator = (IndexInsertDeleteUpsertOperator) descendantOp;
-                if (!indexInsertDeleteUpsertOperator.isBulkload()
-                        && indexInsertDeleteUpsertOperator.getPrevSecondaryKeyExprs() == null) {
-                    primaryKeyExprs = indexInsertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetId();
-                    dataverse = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDataverseName();
-                    datasetName = ((DatasetDataSource) indexInsertDeleteUpsertOperator.getDataSourceIndex()
-                            .getDataSource()).getDataset().getDatasetName();
-                    break;
-                }
-            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
-                InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
-                if (!insertDeleteUpsertOperator.isBulkload()) {
-                    primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetName();
-                    if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
-                        //we need to add a function that checks if previous record was found
-                        upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
-                        // is new value missing? -> this means that the expected operation is delete
-                        AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
-                        isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
-                        AbstractFunctionCallExpression isPrevMissingFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
-                        // argument is the previous record
-                        isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
-                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
-                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
-
-                        // AssignOperator puts in the cast var the casted record
-                        upsertFlagAssign = new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
-                        // Connect the current top of the plan to the cast operator
-                        upsertFlagAssign.getInputs()
-                                .add(new MutableObject<ILogicalOperator>(sinkOperator.getInputs().get(0).getValue()));
-                        sinkOperator.getInputs().clear();
-                        sinkOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
-                        context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
-                    }
-                    break;
-                }
-            }
-            if (descendantOp.getInputs().size() < 1) {
-                break;
-            }
-            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
-        }
-
-        if (primaryKeyExprs == null) {
-            return false;
-        }
-
-        //copy primaryKeyExprs
-        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<LogicalVariable>();
-        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
-            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr.getValue();
-            primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
-        }
-
-        //get JobId(TransactorId)
-        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
-        JobId jobId = mp.getJobId();
-
-        //create the logical and physical operator
-        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar);
-        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
-                primaryKeyLogicalVars, upsertVar);
-        commitOperator.setPhysicalOperator(commitPOperator);
-
-        //create ExtensionOperator and put the commitOperator in it.
-        ExtensionOperator extensionOperator = new ExtensionOperator(commitOperator);
-        extensionOperator.setPhysicalOperator(commitPOperator);
-
-        //update plan link
-        extensionOperator.getInputs().add(sinkOperator.getInputs().get(0));
-        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
-        opRef.setValue(extensionOperator);
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
new file mode 100644
index 0000000..4bbfce0
--- /dev/null
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.algebra.operators.CommitOperator;
+import org.apache.asterix.algebra.operators.physical.CommitPOperator;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) {
+            return false;
+        }
+        DelegateOperator eOp = (DelegateOperator) op;
+        if (!(eOp.getDelegate() instanceof CommitOperator)) {
+            return false;
+        }
+        boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
+
+        List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
+        int datasetId = 0;
+        String dataverse = null;
+        String datasetName = null;
+        AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
+        LogicalVariable upsertVar = null;
+        while (descendantOp != null) {
+            if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT) {
+                IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
+                if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) {
+                    primaryKeyExprs = operator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
+                            .getDatasetName();
+                    break;
+                }
+            } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
+                InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
+                if (!insertDeleteUpsertOperator.isBulkload()) {
+                    primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
+                    datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetId();
+                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDataverseName();
+                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
+                            .getDatasetName();
+                    if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
+                        //we need to add a function that checks if previous record was found
+                        upsertVar = context.newVar();
+                        AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OR));
+                        // is new value missing? -> this means that the expected operation is delete
+                        AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
+                        isNewMissingFunc.getArguments().add(insertDeleteUpsertOperator.getPayloadExpression());
+                        AbstractFunctionCallExpression isPrevMissingFunc = new ScalarFunctionCallExpression(
+                                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_MISSING));
+                        // argument is the previous record
+                        isPrevMissingFunc.getArguments().add(new MutableObject<ILogicalExpression>(
+                                new VariableReferenceExpression(insertDeleteUpsertOperator.getBeforeOpRecordVar())));
+                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isPrevMissingFunc));
+                        orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
+
+                        // AssignOperator puts in the cast var the casted record
+                        AssignOperator upsertFlagAssign = new AssignOperator(upsertVar,
+                                new MutableObject<ILogicalExpression>(orFunc));
+                        // Connect the current top of the plan to the cast operator
+                        upsertFlagAssign.getInputs()
+                                .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+                        eOp.getInputs().clear();
+                        eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
+                        context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
+                    }
+                    break;
+                }
+            }
+            if (descendantOp.getInputs().isEmpty()) {
+                break;
+            }
+            descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
+        }
+
+        if (primaryKeyExprs == null) {
+            return false;
+        }
+
+        //copy primaryKeyExprs
+        List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<>();
+        for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
+            VariableReferenceExpression varRefExpr = (VariableReferenceExpression) expr.getValue();
+            primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
+        }
+
+        //get JobId(TransactorId)
+        AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+        JobId jobId = mp.getJobId();
+
+        //create the logical and physical operator
+        CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
+        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
+                primaryKeyLogicalVars, upsertVar, isSink);
+        commitOperator.setPhysicalOperator(commitPOperator);
+
+        //create ExtensionOperator and put the commitOperator in it.
+        DelegateOperator extensionOperator = new DelegateOperator(commitOperator);
+        extensionOperator.setPhysicalOperator(commitPOperator);
+
+        //update plan link
+        extensionOperator.getInputs().add(eOp.getInputs().get(0));
+        context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
+        opRef.setValue(extensionOperator);
+        return true;
+    }
+}


[2/4] asterixdb git commit: Enhanced Insert AQL

Posted by sj...@apache.org.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index f4ef1a1..8a25888 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -65,19 +65,19 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.external.api.IAdapterFactory;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.api.IFeed.FeedType;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.FeedJointType;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.management.FeedEventsListener;
 import org.apache.asterix.external.feed.management.FeedJointKey;
-import org.apache.asterix.external.feed.management.FeedLifecycleEventSubscriber;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -342,7 +342,12 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         break;
                     case Statement.Kind.INSERT:
                     case Statement.Kind.UPSERT:
-                        handleInsertUpsertStatement(metadataProvider, stmt, hcc);
+                        if (((InsertStatement) stmt).getReturnQuery() != null) {
+                            metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++));
+                            metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC
+                                    || resultDelivery == ResultDelivery.ASYNC_DEFERRED);
+                        }
+                        handleInsertUpsertStatement(metadataProvider, stmt, hcc, hdc, resultDelivery, stats, false);
                         break;
                     case Statement.Kind.DELETE:
                         handleDeleteStatement(metadataProvider, stmt, hcc);
@@ -393,7 +398,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         // No op
                         break;
                     case Statement.Kind.EXTENSION:
-                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc);
+                        ((IExtensionStatement) stmt).handle(this, metadataProvider, hcc, hdc, resultDelivery, stats,
+                                resultSetIdCounter);
                         break;
                     default:
                         throw new AsterixException("Unknown function");
@@ -506,7 +512,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleCreateDatasetStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws AsterixException, Exception {
         MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS);
         DatasetDecl dd = (DatasetDecl) stmt;
@@ -602,8 +608,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     break;
                 case EXTERNAL:
                     String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter();
-                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl())
-                            .getProperties();
+                    Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties();
 
                     datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(),
                             ExternalDatasetTransactionState.COMMIT);
@@ -705,7 +710,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         StringBuilder builder = null;
         IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+            if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -730,8 +735,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse,
-            MetadataTransactionContext mdTxnCtx)
+    protected String configureNodegroupForDataset(DatasetDecl dd, String dataverse, MetadataTransactionContext mdTxnCtx)
             throws AsterixException {
         int nodegroupCardinality;
         String nodegroupName;
@@ -984,8 +988,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
 
             ARecordType enforcedType = null;
             if (stmtCreateIndex.isEnforced()) {
-                enforcedType = createEnforcedType(aRecordType,
-                        Lists.newArrayList(index));
+                enforcedType = createEnforcedType(aRecordType, Lists.newArrayList(index));
             }
 
             // #. prepare to create the index artifact in NC.
@@ -1350,7 +1353,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
     }
 
-    protected void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleDatasetDropStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
         String dataverseName = getActiveDataverse(stmtDelete.getDataverseName());
@@ -1369,8 +1372,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
                     return;
                 } else {
-                    throw new AlgebricksException("There is no dataset with this name " + datasetName
-                            + " in dataverse " + dataverseName + ".");
+                    throw new AlgebricksException("There is no dataset with this name " + datasetName + " in dataverse "
+                            + dataverseName + ".");
                 }
             }
 
@@ -1424,7 +1427,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             // prepare job spec(s) that would disconnect any active feeds involving the dataset.
             IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
-                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     throw new AsterixException(
                             "Can't drop dataset since it is connected to active entity: " + listener.getEntityId());
                 }
@@ -1482,8 +1485,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 } else {
                     CompiledIndexDropStatement cds = new CompiledIndexDropStatement(dataverseName, datasetName,
                             indexes.get(j).getIndexName());
-                    jobsToExecute.add(
-                            ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
+                    jobsToExecute.add(ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, ds));
                 }
             }
 
@@ -1547,7 +1549,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
-                if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+                if (listener.isEntityUsingDataset(dataverseName, datasetName)) {
                     if (builder == null) {
                         builder = new StringBuilder();
                     }
@@ -1555,9 +1557,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 }
             }
             if (builder != null) {
-                throw new AsterixException(
-                        "Dataset" + datasetName + " is currently being fed into by the following active entities: "
-                                + builder.toString());
+                throw new AsterixException("Dataset" + datasetName
+                        + " is currently being fed into by the following active entities: " + builder.toString());
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1578,11 +1579,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(),
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                IMetadataEntity.PENDING_DROP_OP));
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1644,11 +1643,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
-                        new Index(dataverseName, datasetName, indexName, index.getIndexType(),
-                                index.getKeyFieldNames(),
+                        new Index(dataverseName, datasetName, indexName, index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), index.getKeyFieldTypes(),
-                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(),
-                                IMetadataEntity.PENDING_DROP_OP));
+                                index.isEnforcingKeyFileds(), index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
                 // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1707,9 +1704,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 } catch (Exception e2) {
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
-                    throw new IllegalStateException("System is inconsistent state: pending index("
-                            + dataverseName + "." + datasetName + "."
-                            + indexName + ") couldn't be removed from the metadata", e);
+                    throw new IllegalStateException("System is inconsistent state: pending index(" + dataverseName + "."
+                            + datasetName + "." + indexName + ") couldn't be removed from the metadata", e);
                 }
             }
 
@@ -1748,8 +1744,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt)
-            throws Exception {
+    protected void handleNodegroupDropStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception {
         NodeGroupDropStatement stmtDelete = (NodeGroupDropStatement) stmt;
         String nodegroupName = stmtDelete.getNodeGroupName().getValue();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -1859,40 +1854,61 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         }
     }
 
-    protected void handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
-            IHyracksClientConnection hcc) throws Exception {
+    public JobSpecification handleInsertUpsertStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery,
+            IStatementExecutor.Stats stats, boolean compileOnly) throws Exception {
 
         InsertStatement stmtInsertUpsert = (InsertStatement) stmt;
+
         String dataverseName = getActiveDataverse(stmtInsertUpsert.getDataverseName());
         Query query = stmtInsertUpsert.getQuery();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        if (stmtInsertUpsert.getReturnQuery() != null) {
+            if (!stmtInsertUpsert.getReturnQuery().getDatasets().isEmpty()) {
+                throw new AsterixException("Cannot use datasets in an insert returning query");
+            }
+            // returnQuery Rewriting (happens under the same ongoing metadata transaction)
+            Pair<Query, Integer> rewrittenReturnQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider,
+                    stmtInsertUpsert.getReturnQuery(), sessionConfig);
+
+            stmtInsertUpsert.getQuery().setVarCounter(rewrittenReturnQuery.first.getVarCounter());
+            stmtInsertUpsert.setRewrittenReturnQuery(rewrittenReturnQuery.first);
+            stmtInsertUpsert.addToVarCounter(rewrittenReturnQuery.second);
+        }
+
         MetadataLockManager.INSTANCE.insertDeleteUpsertBegin(dataverseName,
                 dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(), query.getDatasets());
-
+        JobSpecification compiled = null;
         try {
             metadataProvider.setWriteTransaction(true);
             CompiledInsertStatement clfrqs = null;
             switch (stmtInsertUpsert.getKind()) {
                 case Statement.Kind.INSERT:
                     clfrqs = new CompiledInsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
-                            query, stmtInsertUpsert.getVarCounter());
+                            query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(),
+                            stmtInsertUpsert.getReturnQuery());
                     break;
                 case Statement.Kind.UPSERT:
                     clfrqs = new CompiledUpsertStatement(dataverseName, stmtInsertUpsert.getDatasetName().getValue(),
-                            query, stmtInsertUpsert.getVarCounter());
+                            query, stmtInsertUpsert.getVarCounter(), stmtInsertUpsert.getVar(),
+                            stmtInsertUpsert.getReturnQuery());
                     break;
                 default:
                     throw new AlgebricksException("Unsupported statement type " + stmtInsertUpsert.getKind());
             }
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
+            compiled = rewriteCompileQuery(metadataProvider, query, clfrqs);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            if (compiled != null) {
-                JobUtils.runJob(hcc, compiled, true);
+            if (compiled != null && !compileOnly) {
+                if (stmtInsertUpsert.getReturnQuery() != null) {
+                    handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
+                } else {
+                    JobUtils.runJob(hcc, compiled, true);
+                }
             }
 
         } catch (Exception e) {
@@ -1905,9 +1921,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     dataverseName + "." + stmtInsertUpsert.getDatasetName(), query.getDataverses(),
                     query.getDatasets());
         }
+        return compiled;
     }
 
-    protected void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
+    public void handleDeleteStatement(AqlMetadataProvider metadataProvider, Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
 
         DeleteStatement stmtDelete = (DeleteStatement) stmt;
@@ -1948,7 +1965,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
     @Override
     public JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement stmt)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata transaction)
         Pair<Query, Integer> reWrittenQuery = apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
@@ -2034,12 +2051,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             String description = cfps.getDescription() == null ? "" : cfps.getDescription();
             if (extendingExisting) {
                 FeedPolicyEntity sourceFeedPolicy = MetadataManager.INSTANCE
-                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse,
-                                cfps.getSourcePolicyName());
+                        .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName());
                 if (sourceFeedPolicy == null) {
-                    sourceFeedPolicy =
-                            MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
-                                    MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
+                    sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
+                            MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName());
                     if (sourceFeedPolicy == null) {
                         throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName());
                     }
@@ -2158,7 +2173,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         boolean subscriberRegistered = false;
-        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedConnectionId feedConnId = null;
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, cfs.getFeedName());
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
@@ -2211,7 +2226,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                         triple.third.get(0), pair.first);
                 pair.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
                 JobUtils.runJob(hcc, pair.first, false);
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
             } else {
                 for (IFeedJoint fj : triple.third) {
                     listener.registerFeedJoint(fj, 0);
@@ -2219,9 +2234,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
             if (Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION))) {
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
+                eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED); // blocking call
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2251,7 +2266,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
      */
     protected Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> getFeedConnectionRequest(String dataverse,
             Feed feed, String dataset, FeedPolicyEntity feedPolicy, MetadataTransactionContext mdTxnCtx)
-            throws AsterixException {
+                    throws AsterixException {
         IFeedJoint sourceFeedJoint;
         FeedConnectionRequest request;
         List<String> functionsToApply = new ArrayList<>();
@@ -2349,10 +2364,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, cfs.getFeedName().getValue(), mdTxnCtx);
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, feed.getDataverseName(), feed.getFeedName());
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), cfs.getDatasetName().getValue());
-        IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber();
+        IActiveLifecycleEventSubscriber eventSubscriber = new ActiveLifecycleEventSubscriber();
         FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) {
+        if (listener == null || !listener.isEntityUsingDataset(dataverseName, datasetName)) {
             throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid operation!");
         }
@@ -2372,7 +2387,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             JobUtils.runJob(hcc, jobSpec, true);
-            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
+            eventSubscriber.assertEvent(ActiveLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2428,8 +2443,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             if (compiled != null) {
                 FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE
                         .getActiveEntityListener(bfs.getSubscriptionRequest().getReceivingFeedId());
-                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(
-                        bfs.getSubscriptionRequest().getReceivingFeedId(), null, ActivityState.ACTIVE,
+                FeedConnectJobInfo activeJob = new FeedConnectJobInfo(bfs.getSubscriptionRequest().getReceivingFeedId(),
+                        null, ActivityState.ACTIVE,
                         new FeedConnectionId(bfs.getSubscriptionRequest().getReceivingFeedId(), dataset),
                         listener.getSourceFeedJoint(), null, alteredJobSpec, policy.getProperties());
                 alteredJobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, activeJob);
@@ -2484,8 +2499,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                     dataverseName);
             jobsToExecute.add(DatasetOperations.compactDatasetJobSpec(dataverse, datasetName, metadataProvider));
             ARecordType aRecordType = (ARecordType) dt.getDatatype();
-            ARecordType enforcedType = createEnforcedType(
-                    aRecordType, indexes);
+            ARecordType enforcedType = createEnforcedType(aRecordType, indexes);
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -2541,59 +2555,25 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
         jobsToExecute.add(ExternalIndexingOperations.compactFilesIndexJobSpec(ds, metadataProvider));
     }
 
-    protected void handleQuery(AqlMetadataProvider metadataProvider, Query query, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats) throws Exception {
-
+    protected JobSpecification handleQuery(AqlMetadataProvider metadataProvider, Query query,
+            IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats)
+                    throws Exception {
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockManager.INSTANCE.queryBegin(activeDefaultDataverse, query.getDataverses(), query.getDatasets());
+        JobSpecification compiled;
         try {
-            JobSpecification compiled = rewriteCompileQuery(metadataProvider, query, null);
+            compiled = rewriteCompileQuery(metadataProvider, query, null);
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
             if (query.isExplain()) {
                 sessionConfig.out().flush();
-                return;
+                return compiled;
             } else if (sessionConfig.isExecuteQuery() && compiled != null) {
-                if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
-                    GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
-                }
-                JobId jobId = JobUtils.runJob(hcc, compiled, false);
-
-                JSONObject response = new JSONObject();
-                switch (resultDelivery) {
-                    case ASYNC:
-                        JSONArray handle = new JSONArray();
-                        handle.put(jobId.getId());
-                        handle.put(metadataProvider.getResultSetId().getId());
-                        response.put("handle", handle);
-                        sessionConfig.out().print(response);
-                        sessionConfig.out().flush();
-                        hcc.waitForCompletion(jobId);
-                        break;
-                    case SYNC:
-                        hcc.waitForCompletion(jobId);
-                        ResultReader resultReader = new ResultReader(hdc);
-                        resultReader.open(jobId, metadataProvider.getResultSetId());
-                        ResultUtil.displayResults(resultReader, sessionConfig, stats,
-                                metadataProvider.findOutputRecordType());
-                        break;
-                    case ASYNC_DEFERRED:
-                        handle = new JSONArray();
-                        handle.put(jobId.getId());
-                        handle.put(metadataProvider.getResultSetId().getId());
-                        response.put("handle", handle);
-                        hcc.waitForCompletion(jobId);
-                        sessionConfig.out().print(response);
-                        sessionConfig.out().flush();
-                        break;
-                    default:
-                        break;
-
-                }
+                handleQueryResult(metadataProvider, hcc, hdc, compiled, resultDelivery, stats);
             }
         } catch (Exception e) {
             LOGGER.log(Level.INFO, e.getMessage(), e);
@@ -2606,6 +2586,47 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
             // release external datasets' locks acquired during compilation of the query
             ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
+        return compiled;
+    }
+
+    private void handleQueryResult(AqlMetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IHyracksDataset hdc, JobSpecification compiled, ResultDelivery resultDelivery, Stats stats)
+                    throws Exception {
+        if (GlobalConfig.ASTERIX_LOGGER.isLoggable(Level.FINE)) {
+            GlobalConfig.ASTERIX_LOGGER.fine(compiled.toJSON().toString(1));
+        }
+        JobId jobId = JobUtils.runJob(hcc, compiled, false);
+
+        JSONObject response = new JSONObject();
+        switch (resultDelivery) {
+            case ASYNC:
+                JSONArray handle = new JSONArray();
+                handle.put(jobId.getId());
+                handle.put(metadataProvider.getResultSetId().getId());
+                response.put("handle", handle);
+                sessionConfig.out().print(response);
+                sessionConfig.out().flush();
+                hcc.waitForCompletion(jobId);
+                break;
+            case SYNC:
+                hcc.waitForCompletion(jobId);
+                ResultReader resultReader = new ResultReader(hdc);
+                resultReader.open(jobId, metadataProvider.getResultSetId());
+                ResultUtil.displayResults(resultReader, sessionConfig, stats, metadataProvider.findOutputRecordType());
+                break;
+            case ASYNC_DEFERRED:
+                handle = new JSONArray();
+                handle.put(jobId.getId());
+                handle.put(metadataProvider.getResultSetId().getId());
+                response.put("handle", handle);
+                hcc.waitForCompletion(jobId);
+                sessionConfig.out().print(response);
+                sessionConfig.out().flush();
+                break;
+            default:
+                break;
+
+        }
     }
 
     protected void handleCreateNodeGroupStatement(AqlMetadataProvider metadataProvider, Statement stmt)
@@ -2876,8 +2897,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen
                 break;
             default:
                 throw new AlgebricksException(
-                        "The system \"" + runStmt.getSystem() +
-                                "\" specified in your run statement is not supported.");
+                        "The system \"" + runStmt.getSystem() + "\" specified in your run statement is not supported.");
         }
 
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a548b3a..a806532 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -170,7 +170,7 @@ public class TestNodeController {
                 indexOpDesc, ctx, PARTITION, primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider,
                 op, true);
         CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
-                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION, true);
         insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
         commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
         return insertOp;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
index 6e3f94c..2716859 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/optimizer/OptimizerTest.java
@@ -30,6 +30,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.api.java.AsterixJavaClient;
+import org.apache.asterix.app.cc.CompilerExtensionManager;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -38,6 +39,7 @@ import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.IdentitiyResolverFactory;
+import org.apache.asterix.runtime.util.AsterixAppContextInfo;
 import org.apache.asterix.test.base.AsterixTestHelper;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.test.runtime.HDFSCluster;
@@ -66,15 +68,16 @@ public class OptimizerTest {
             + "optimizerts" + SEPARATOR;
     private static final String PATH_QUERIES = PATH_BASE + "queries" + SEPARATOR;
     private static final String PATH_EXPECTED = PATH_BASE + "results" + SEPARATOR;
-    private static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR;
+    protected static final String PATH_ACTUAL = "target" + File.separator + "opttest" + SEPARATOR;
 
     private static final ArrayList<String> ignore = AsterixTestHelper.readFile(FILENAME_IGNORE, PATH_BASE);
     private static final ArrayList<String> only = AsterixTestHelper.readFile(FILENAME_ONLY, PATH_BASE);
-    private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    protected static String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
     private static final ILangCompilationProvider aqlCompilationProvider = new AqlCompilationProvider();
     private static final ILangCompilationProvider sqlppCompilationProvider = new SqlppCompilationProvider();
+    protected static ILangCompilationProvider extensionLangCompilationProvider = null;
 
-    private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+    protected static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -171,9 +174,13 @@ public class OptimizerTest {
             PrintWriter plan = new PrintWriter(actualFile);
             ILangCompilationProvider provider = queryFile.getName().endsWith("aql") ? aqlCompilationProvider
                     : sqlppCompilationProvider;
+            if (extensionLangCompilationProvider != null) {
+                provider = extensionLangCompilationProvider;
+            }
             IHyracksClientConnection hcc = integrationUtil.getHyracksClientConnection();
             AsterixJavaClient asterix = new AsterixJavaClient(hcc, query, plan, provider,
-                    new DefaultStatementExecutorFactory(null));
+                    new DefaultStatementExecutorFactory(
+                            (CompilerExtensionManager) AsterixAppContextInfo.INSTANCE.getExtensionManager()));
             try {
                 asterix.compile(true, false, false, true, true, false, false);
             } catch (AsterixException e) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 39a4d3b..3929113 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -52,8 +52,12 @@ public class ExecutionTestUtil {
         return setUp(cleanup, TEST_CONFIG_FILE_NAME, integrationUtil, true);
     }
 
+    public static List<ILibraryManager> setUp(boolean cleanup, String configFile) throws Exception {
+        return setUp(cleanup, configFile, integrationUtil, true);
+    }
+
     public static List<ILibraryManager> setUp(boolean cleanup, String configFile,
-            AsterixHyracksIntegrationUtil integrationUtil, boolean startHdfs) throws Exception {
+            AsterixHyracksIntegrationUtil alternateIntegrationUtil, boolean startHdfs) throws Exception {
         System.out.println("Starting setup");
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting setup");
@@ -63,6 +67,7 @@ public class ExecutionTestUtil {
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("initializing pseudo cluster");
         }
+        integrationUtil = alternateIntegrationUtil;
         integrationUtil.init(cleanup);
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,8 +92,8 @@ public class ExecutionTestUtil {
         libraryManagers.add(AsterixAppContextInfo.INSTANCE.getLibraryManager());
         // Adds library managers for NCs, one-per-NC.
         for (NodeControllerService nc : integrationUtil.ncs) {
-            IAsterixAppRuntimeContext runtimeCtx =
-                    (IAsterixAppRuntimeContext) nc.getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) nc.getApplicationContext()
+                    .getApplicationObject();
             libraryManagers.add(runtimeCtx.getLibraryManager());
         }
         return libraryManagers;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
new file mode 100644
index 0000000..205edc2
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/insert-return-custom-result.aql
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string,
+  location:point
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
+
+
+insert into dataset TweetMessageuuids as $a(
+let $x :=
+[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)},
+{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)},
+{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)},
+{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)},
+{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}]
+for $y in $x
+return $y
+) returning
+let $x := create-circle($a.location,5.0)
+order by $a.tweetid
+return {
+  "x":$x,
+  "tweetid":$a.tweetid
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
new file mode 100644
index 0000000..391b248
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/insert-return-custom-result.plan
@@ -0,0 +1,17 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- STABLE_SORT [$$16(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- COMMIT  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- INSERT_DELETE  |PARTITIONED|
+                      -- HASH_PARTITION_EXCHANGE [$$16]  |PARTITIONED|
+                        -- ASSIGN  |UNPARTITIONED|
+                          -- STREAM_PROJECT  |UNPARTITIONED|
+                            -- UNNEST  |UNPARTITIONED|
+                              -- ASSIGN  |UNPARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
new file mode 100644
index 0000000..a75f9da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
new file mode 100644
index 0000000..6fc7112
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-return-records/insert-return-records.3.query.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+ use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+[{ "tweetid":1,"message-text":"hello"},
+{"tweetid":2,"message-text":"goodbye"},
+{"tweetid":3,"message-text":"the end"},
+{"tweetid":4,"message-text":"what"},
+{"tweetid":5,"message-text":"good"}]
+) returning $message;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
new file mode 100644
index 0000000..9e77afb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-returning-fieldname
+ * Description     : Check fields returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: uuid,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid autogenerated;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
new file mode 100644
index 0000000..6a7be26
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-returning-fieldname/insert-returning-fieldname.3.query.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-returning-fieldname
+ * Description     : Check fields returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+{ "message-text":"hello"}
+) returning $message.message-text;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
new file mode 100644
index 0000000..a75f9da
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.1.ddl.aql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-return-records
+ * Description     : Check records returned on insert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
new file mode 100644
index 0000000..417860d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/insert-with-bad-return/insert-with-bad-return.3.query.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : insert-with-bad-return
+ * Description     : Throw an error
+ * Expected Result : Error
+ * Date            : Oct 2016
+ */
+
+ use dataverse test;
+
+insert into dataset TweetMessageuuids as $message (
+[{ "tweetid":1,"message-text":"hello"},
+{"tweetid":2,"message-text":"goodbye"},
+{"tweetid":3,"message-text":"the end"},
+{"tweetid":4,"message-text":"what"},
+{"tweetid":5,"message-text":"good"}]
+) returning
+for $result in dataset TweetMessageuuids
+where $result.message-text = $message
+return $result;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
new file mode 100644
index 0000000..50ff8ae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.1.ddl.aql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : upsert-return-custom-result
+ * Description     : Check records returned on upsert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TweetMessageTypeuuid as closed {
+  tweetid: int,
+  message-text: string,
+  location:point
+}
+
+create dataset TweetMessageuuids(TweetMessageTypeuuid)
+primary key tweetid;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
new file mode 100644
index 0000000..9abf667
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/dml/upsert-return-custom-result/upsert-return-custom-result.3.query.aql
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : upsert-return-custom-result
+ * Description     : Check records returned on upsert
+ * Expected Result : Success
+ * Date            : Mar 2015
+ */
+
+ use dataverse test;
+
+upsert into dataset TweetMessageuuids as $a (
+let $x :=
+[{ "tweetid":1,"message-text":"hello","location":create-point(6.0,6.0)},
+{"tweetid":2,"message-text":"goodbye","location":create-point(1.0,1.0)},
+{"tweetid":3,"message-text":"the end","location":create-point(6.0,3.0)},
+{"tweetid":4,"message-text":"what","location":create-point(3.0,6.0)},
+{"tweetid":5,"message-text":"good","location":create-point(5.0,6.0)}]
+for $y in $x
+return $y
+) returning
+let $x := create-circle($a.location,5.0)
+order by $a.tweetid
+return {
+  "x":$x,
+  "tweetid":$a.tweetid
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
new file mode 100644
index 0000000..2ee9b1c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-return-records/insert-return-records.1.adm
@@ -0,0 +1,5 @@
+{ "tweetid": 1, "message-text": "hello" }
+{ "tweetid": 2, "message-text": "goodbye" }
+{ "tweetid": 4, "message-text": "what" }
+{ "tweetid": 3, "message-text": "the end" }
+{ "tweetid": 5, "message-text": "good" }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
new file mode 100644
index 0000000..84ed78b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/insert-returning-fieldname/insert-returning-fieldname.1.adm
@@ -0,0 +1 @@
+"hello"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
new file mode 100644
index 0000000..8706beb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/dml/upsert-return-custom-result/upsert-return-custom-result.1.adm
@@ -0,0 +1,5 @@
+{ "x": circle("6.0,6.0 5.0"), "tweetid": 1 }
+{ "x": circle("1.0,1.0 5.0"), "tweetid": 2 }
+{ "x": circle("3.0,6.0 5.0"), "tweetid": 4 }
+{ "x": circle("6.0,3.0 5.0"), "tweetid": 3 }
+{ "x": circle("5.0,6.0 5.0"), "tweetid": 5 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index 85884f8..d0e342c 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -1699,6 +1699,27 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="dml">
+      <compilation-unit name="insert-return-records">
+        <output-dir compare="Text">insert-return-records</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="insert-returning-fieldname">
+        <output-dir compare="Text">insert-returning-fieldname</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="insert-with-bad-return">
+        <output-dir compare="Text">insert-with-bad-return</output-dir>
+        <expected-error>Error: Cannot use datasets in an insert returning query</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
+      <compilation-unit name="upsert-return-custom-result">
+        <output-dir compare="Text">upsert-return-custom-result</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="dml">
       <compilation-unit name="insert-src-dst-01">
         <output-dir compare="Text">insert-src-dst-01</output-dir>
       </compilation-unit>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
index 510bc83..393beec 100644
--- a/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
+++ b/asterixdb/asterix-doc/src/site/markdown/aql/manual.md
@@ -824,7 +824,7 @@ data that has been prepared in ADM format.
 
 #### Insert
 
-    InsertStatement ::= "insert" "into" "dataset" QualifiedName Query
+    InsertStatement ::= "insert" "into" "dataset" QualifiedName ( "as" Variable )? Query ( "returning" Query )?
 
 The AQL insert statement is used to insert data into a dataset.
 The data to be inserted comes from an AQL query expression.
@@ -834,13 +834,17 @@ being the insertion of a single object plus its affiliated secondary index entri
 If the query part of an insert returns a single object, then the insert statement itself will
 be a single, atomic transaction.
 If the query part returns multiple objects, then each object inserted will be handled independently
-as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.)
+as a tranaction. If a dataset has an auto-generated primary key field, an insert statement should not include a value for that field in it. (The system will automatically extend the provided record with this additional field and a corresponding value.).
+The optional "as Variable" provides a variable binding for the inserted records, which can be used in the "returning" clause.
+The optional "returning Query" allows users to run simple queries/functions on the records returned by the insert.
+This query cannot refer to any datasets.
+
 
 The following example illustrates a query-based insertion.
 
 ##### Example
 
-    insert into dataset UsersCopy (for $user in dataset FacebookUsers return $user)
+    insert into dataset UsersCopy as $inserted (for $user in dataset FacebookUsers return $user ) returning $inserted.screen-name
 
 #### Delete
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
new file mode 100644
index 0000000..be9a245
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IActiveLifecycleEventSubscriber.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.api;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+
+public interface IActiveLifecycleEventSubscriber {
+
+    public enum ActiveLifecycleEvent {
+        FEED_INTAKE_STARTED,
+        FEED_COLLECT_STARTED,
+        FEED_INTAKE_FAILURE,
+        FEED_COLLECT_FAILURE,
+        FEED_INTAKE_ENDED,
+        FEED_COLLECT_ENDED,
+        ACTIVE_JOB_STARTED,
+        ACTIVE_JOB_ENDED,
+        ACTIVE_JOB_FAILED
+    }
+
+    public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException;
+
+    public void handleEvent(ActiveLifecycleEvent event);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
deleted file mode 100644
index ad3c1c9..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.api;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-
-public interface IFeedLifecycleEventSubscriber {
-
-    public enum FeedLifecycleEvent {
-        FEED_INTAKE_STARTED,
-        FEED_COLLECT_STARTED,
-        FEED_INTAKE_FAILURE,
-        FEED_COLLECT_FAILURE,
-        FEED_INTAKE_ENDED,
-        FEED_COLLECT_ENDED
-    }
-
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException;
-
-    public void handleFeedEvent(FeedLifecycleEvent event);
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
new file mode 100644
index 0000000..7f8efc5
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveLifecycleEventSubscriber.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+
+public class ActiveLifecycleEventSubscriber implements IActiveLifecycleEventSubscriber {
+
+    private LinkedBlockingQueue<ActiveLifecycleEvent> inbox;
+
+    public ActiveLifecycleEventSubscriber() {
+        this.inbox = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public void handleEvent(ActiveLifecycleEvent event) {
+        inbox.add(event);
+    }
+
+    @Override
+    public void assertEvent(ActiveLifecycleEvent event) throws AsterixException, InterruptedException {
+        boolean eventOccurred = false;
+        ActiveLifecycleEvent e;
+        Iterator<ActiveLifecycleEvent> eventsSoFar = inbox.iterator();
+        while (eventsSoFar.hasNext()) {
+            e = eventsSoFar.next();
+            assertNoFailure(e);
+            eventOccurred = e.equals(event);
+        }
+
+        while (!eventOccurred) {
+            e = inbox.take();
+            eventOccurred = e.equals(event);
+            if (!eventOccurred) {
+                assertNoFailure(e);
+            }
+        }
+    }
+
+    private void assertNoFailure(ActiveLifecycleEvent e) throws AsterixException {
+        if (e.equals(ActiveLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(ActiveLifecycleEvent.FEED_COLLECT_FAILURE)
+                || e.equals(ActiveLifecycleEvent.ACTIVE_JOB_FAILED)) {
+            throw new AsterixException("Failure in active job.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 0c49c92..02d2c8b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -34,8 +34,8 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.external.feed.api.FeedOperationCounter;
 import org.apache.asterix.external.feed.api.IFeedJoint;
 import org.apache.asterix.external.feed.api.IFeedJoint.State;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.watch.FeedConnectJobInfo;
 import org.apache.asterix.external.feed.watch.FeedIntakeInfo;
@@ -63,7 +63,7 @@ import org.apache.log4j.Logger;
 public class FeedEventsListener implements IActiveEntityEventsListener {
     private static final Logger LOGGER = Logger.getLogger(FeedEventsListener.class);
     private final Map<EntityId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline;
-    private final List<IFeedLifecycleEventSubscriber> subscribers;
+    private final List<IActiveLifecycleEventSubscriber> subscribers;
     private final Map<Long, ActiveJob> jobs;
     private final Map<Long, ActiveJob> intakeJobs;
     private final Map<EntityId, FeedIntakeInfo> entity2Intake;
@@ -145,7 +145,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
             case FEED_CONNECT:
                 ((FeedConnectJobInfo) jobInfo).partitionStart();
                 if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
-                    notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                    notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_COLLECT_STARTED);
                 }
                 break;
             case INTAKE:
@@ -161,7 +161,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         if (feedPipeline.get(message.getEntityId()).first.decrementAndGet() == 0) {
             ((FeedIntakeInfo) jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
             jobInfo.setState(ActivityState.ACTIVE);
-            notifyFeedEventSubscribers(FeedLifecycleEvent.FEED_INTAKE_STARTED);
+            notifyFeedEventSubscribers(ActiveLifecycleEvent.FEED_INTAKE_STARTED);
         }
     }
 
@@ -339,10 +339,10 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         return locations;
     }
 
-    private synchronized void notifyFeedEventSubscribers(FeedLifecycleEvent event) {
+    private synchronized void notifyFeedEventSubscribers(ActiveLifecycleEvent event) {
         if (subscribers != null && !subscribers.isEmpty()) {
-            for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
-                subscriber.handleFeedEvent(event);
+            for (IActiveLifecycleEventSubscriber subscriber : subscribers) {
+                subscriber.handleEvent(event);
             }
         }
     }
@@ -362,8 +362,8 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         // notify event listeners
         feedPipeline.remove(feedId);
         entity2Intake.remove(feedId);
-        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? FeedLifecycleEvent.FEED_INTAKE_FAILURE
-                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
+        notifyFeedEventSubscribers(pair.first.isFailedIngestion() ? ActiveLifecycleEvent.FEED_INTAKE_FAILURE
+                : ActiveLifecycleEvent.FEED_INTAKE_ENDED);
     }
 
     private synchronized void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
@@ -389,8 +389,8 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
         connectJobInfos.remove(connectionId);
         jobs.remove(cInfo.getJobId().getId());
         // notify event listeners
-        FeedLifecycleEvent event =
-                failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_COLLECT_ENDED;
+        ActiveLifecycleEvent event =
+                failure ? ActiveLifecycleEvent.FEED_COLLECT_FAILURE : ActiveLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(event);
     }
 
@@ -569,16 +569,16 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
 
     }
 
-    public synchronized void registerFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+    public synchronized void registerFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
         subscribers.add(subscriber);
     }
 
-    public void deregisterFeedEventSubscriber(IFeedLifecycleEventSubscriber subscriber) {
+    public void deregisterFeedEventSubscriber(IActiveLifecycleEventSubscriber subscriber) {
         subscribers.remove(subscriber);
     }
 
     public synchronized boolean isFeedConnectionActive(FeedConnectionId connectionId,
-            IFeedLifecycleEventSubscriber eventSubscriber) {
+            IActiveLifecycleEventSubscriber eventSubscriber) {
         boolean active = false;
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
         if (cInfo != null) {
@@ -643,7 +643,7 @@ public class FeedEventsListener implements IActiveEntityEventsListener {
     }
 
     @Override
-    public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) {
+    public boolean isEntityUsingDataset(String dataverseName, String datasetName) {
         return isConnectedToDataset(datasetName);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
deleted file mode 100644
index 6e3cebce..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedLifecycleEventSubscriber.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber;
-
-public class FeedLifecycleEventSubscriber implements IFeedLifecycleEventSubscriber {
-
-    private LinkedBlockingQueue<FeedLifecycleEvent> inbox;
-
-    public FeedLifecycleEventSubscriber() {
-        this.inbox = new LinkedBlockingQueue<FeedLifecycleEvent>();
-    }
-
-    @Override
-    public void handleFeedEvent(FeedLifecycleEvent event) {
-        inbox.add(event);
-    }
-
-    @Override
-    public void assertEvent(FeedLifecycleEvent event) throws AsterixException, InterruptedException {
-        boolean eventOccurred = false;
-        FeedLifecycleEvent e = null;
-        Iterator<FeedLifecycleEvent> eventsSoFar = inbox.iterator();
-        while (eventsSoFar.hasNext()) {
-            e = eventsSoFar.next();
-            assertNoFailure(e);
-            eventOccurred = e.equals(event);
-        }
-
-        while (!eventOccurred) {
-            e = inbox.take();
-            eventOccurred = e.equals(event);
-            if (!eventOccurred) {
-                assertNoFailure(e);
-            }
-        }
-    }
-
-    private void assertNoFailure(FeedLifecycleEvent e) throws AsterixException {
-        if (e.equals(FeedLifecycleEvent.FEED_INTAKE_FAILURE) || e.equals(FeedLifecycleEvent.FEED_COLLECT_FAILURE)) {
-            throw new AsterixException("Failure in feed");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
deleted file mode 100644
index aac6676..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedActivity.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.external.feed.watch;
-
-import java.util.Map;
-
-public class FeedActivity implements Comparable<FeedActivity> {
-
-    private int activityId;
-
-    private final String dataverseName;
-    private final String datasetName;
-    private final String feedName;
-    private final Map<String, String> feedActivityDetails;
-
-    public static class FeedActivityDetails {
-        public static final String INTAKE_LOCATIONS = "intake-locations";
-        public static final String COMPUTE_LOCATIONS = "compute-locations";
-        public static final String STORAGE_LOCATIONS = "storage-locations";
-        public static final String COLLECT_LOCATIONS = "collect-locations";
-        public static final String FEED_POLICY_NAME = "feed-policy-name";
-        public static final String FEED_CONNECT_TIMESTAMP = "feed-connect-timestamp";
-    }
-
-    public FeedActivity(String dataverseName, String feedName, String datasetName,
-            Map<String, String> feedActivityDetails) {
-        this.dataverseName = dataverseName;
-        this.feedName = feedName;
-        this.datasetName = datasetName;
-        this.feedActivityDetails = feedActivityDetails;
-    }
-
-    public String getDataverseName() {
-        return dataverseName;
-    }
-
-    public String getDatasetName() {
-        return datasetName;
-    }
-
-    public String getFeedName() {
-        return feedName;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-        if (this == other) {
-            return true;
-        }
-        if (!(other instanceof FeedActivity)) {
-            return false;
-        }
-
-        if (!((FeedActivity) other).dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).datasetName.equals(datasetName)) {
-            return false;
-        }
-        if (!((FeedActivity) other).getFeedName().equals(feedName)) {
-            return false;
-        }
-        if (((FeedActivity) other).getActivityId() != (activityId)) {
-            return false;
-        }
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
-    @Override
-    public String toString() {
-        return dataverseName + "." + feedName + " --> " + datasetName + " " + activityId;
-    }
-
-    public String getConnectTimestamp() {
-        return feedActivityDetails.get(FeedActivityDetails.FEED_CONNECT_TIMESTAMP);
-    }
-
-    public int getActivityId() {
-        return activityId;
-    }
-
-    public void setActivityId(int activityId) {
-        this.activityId = activityId;
-    }
-
-    public Map<String, String> getFeedActivityDetails() {
-        return feedActivityDetails;
-    }
-
-    @Override
-    public int compareTo(FeedActivity o) {
-        return o.getActivityId() - this.activityId;
-    }
-
-}


[3/4] asterixdb git commit: Enhanced Insert AQL

Posted by sj...@apache.org.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 4a79387..b1f646a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -36,7 +36,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
@@ -187,7 +187,7 @@ public class SweepIllegalNonfunctionalFunctions extends AbstractExtractExprRule
         }
 
         @Override
-        public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        public Void visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 17dec7c..c033214 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import org.apache.asterix.algebra.extension.IAlgebraExtensionManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
index 16ac80d..ec29b53 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/AbstractIntroduceAccessMethodRule.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.dataflow.data.common.AqlExpressionTypeComputer;
 import org.apache.asterix.metadata.api.IMetadataEntity;
@@ -39,6 +38,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.util.ConstantExpressionUtil;
 import org.apache.asterix.optimizer.rules.am.OptimizableOperatorSubTree.DataSourceType;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -72,15 +72,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
     private AqlMetadataProvider metadataProvider;
 
     // Function Identifier sets that retain the original field variable through each function's arguments
-    private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName =
-            ImmutableSet.of(AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS,
-                    AsterixBuiltinFunctions.SUBSTRING, AsterixBuiltinFunctions.SUBSTRING_BEFORE,
-                    AsterixBuiltinFunctions.SUBSTRING_AFTER, AsterixBuiltinFunctions.CREATE_POLYGON,
-                    AsterixBuiltinFunctions.CREATE_MBR, AsterixBuiltinFunctions.CREATE_RECTANGLE,
-                    AsterixBuiltinFunctions.CREATE_CIRCLE, AsterixBuiltinFunctions.CREATE_LINE,
-                    AsterixBuiltinFunctions.CREATE_POINT, AsterixBuiltinFunctions.NUMERIC_ADD,
-                    AsterixBuiltinFunctions.NUMERIC_SUBTRACT, AsterixBuiltinFunctions.NUMERIC_MULTIPLY,
-                    AsterixBuiltinFunctions.NUMERIC_DIVIDE, AsterixBuiltinFunctions.NUMERIC_MOD);
+    private final ImmutableSet<FunctionIdentifier> funcIDSetThatRetainFieldName = ImmutableSet.of(
+            AsterixBuiltinFunctions.WORD_TOKENS, AsterixBuiltinFunctions.GRAM_TOKENS, AsterixBuiltinFunctions.SUBSTRING,
+            AsterixBuiltinFunctions.SUBSTRING_BEFORE, AsterixBuiltinFunctions.SUBSTRING_AFTER,
+            AsterixBuiltinFunctions.CREATE_POLYGON, AsterixBuiltinFunctions.CREATE_MBR,
+            AsterixBuiltinFunctions.CREATE_RECTANGLE, AsterixBuiltinFunctions.CREATE_CIRCLE,
+            AsterixBuiltinFunctions.CREATE_LINE, AsterixBuiltinFunctions.CREATE_POINT,
+            AsterixBuiltinFunctions.NUMERIC_ADD, AsterixBuiltinFunctions.NUMERIC_SUBTRACT,
+            AsterixBuiltinFunctions.NUMERIC_MULTIPLY, AsterixBuiltinFunctions.NUMERIC_DIVIDE,
+            AsterixBuiltinFunctions.NUMERIC_MOD);
 
     public abstract Map<FunctionIdentifier, List<IAccessMethod>> getAccessMethods();
 
@@ -108,7 +108,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
 
     protected void fillSubTreeIndexExprs(OptimizableOperatorSubTree subTree,
             Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
         // Check applicability of indexes by access method type.
         while (amIt.hasNext()) {
@@ -145,15 +145,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
         return list.isEmpty() ? null : list.get(0);
     }
 
-    protected List<Pair<IAccessMethod, Index>>
-            chooseAllIndex(Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
+    protected List<Pair<IAccessMethod, Index>> chooseAllIndex(
+            Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs) {
         List<Pair<IAccessMethod, Index>> result = new ArrayList<Pair<IAccessMethod, Index>>();
         Iterator<Map.Entry<IAccessMethod, AccessMethodAnalysisContext>> amIt = analyzedAMs.entrySet().iterator();
         while (amIt.hasNext()) {
             Map.Entry<IAccessMethod, AccessMethodAnalysisContext> amEntry = amIt.next();
             AccessMethodAnalysisContext analysisCtx = amEntry.getValue();
-            Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt =
-                    analysisCtx.indexExprsAndVars.entrySet().iterator();
+            Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexIt = analysisCtx.indexExprsAndVars.entrySet()
+                    .iterator();
             while (indexIt.hasNext()) {
                 Map.Entry<Index, List<Pair<Integer, Integer>>> indexEntry = indexIt.next();
                 // To avoid a case where the chosen access method and a chosen
@@ -167,15 +167,14 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                 //                           LENGTH_PARTITIONED_NGRAM_INVIX]
                 IAccessMethod chosenAccessMethod = amEntry.getKey();
                 Index chosenIndex = indexEntry.getKey();
-                boolean isKeywordOrNgramIndexChosen =
-                        chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
-                                || chosenIndex.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX
-                                || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_WORD_INVIX
-                                || chosenIndex.getIndexType() == IndexType.SINGLE_PARTITION_NGRAM_INVIX;
-
-                if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && chosenIndex.getIndexType() == IndexType.BTREE)
-                        || (chosenAccessMethod == RTreeAccessMethod.INSTANCE
-                                && chosenIndex.getIndexType() == IndexType.RTREE)
+                IndexType indexType = chosenIndex.getIndexType();
+                boolean isKeywordOrNgramIndexChosen = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
+                        || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX
+                        || indexType == IndexType.SINGLE_PARTITION_WORD_INVIX
+                        || indexType == IndexType.SINGLE_PARTITION_NGRAM_INVIX;
+
+                if ((chosenAccessMethod == BTreeAccessMethod.INSTANCE && indexType == IndexType.BTREE)
+                        || (chosenAccessMethod == RTreeAccessMethod.INSTANCE && indexType == IndexType.RTREE)
                         || (chosenAccessMethod == InvertedIndexAccessMethod.INSTANCE && isKeywordOrNgramIndexChosen)) {
                     result.add(new Pair<IAccessMethod, Index>(chosenAccessMethod, chosenIndex));
                 }
@@ -196,8 +195,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
      */
     public void pruneIndexCandidates(IAccessMethod accessMethod, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context, IVariableTypeEnvironment typeEnvironment) throws AlgebricksException {
-        Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt =
-                analysisCtx.indexExprsAndVars.entrySet().iterator();
+        Iterator<Map.Entry<Index, List<Pair<Integer, Integer>>>> indexExprAndVarIt = analysisCtx.indexExprsAndVars
+                .entrySet().iterator();
         // Used to keep track of matched expressions (added for prefix search)
         int numMatchedKeys = 0;
         ArrayList<Integer> matchedExpressions = new ArrayList<Integer>();
@@ -226,24 +225,22 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                     }
                     boolean typeMatch = true;
                     //Prune indexes based on field types
-                    List<IAType> indexedTypes = new ArrayList<IAType>();
+                    List<IAType> matchedTypes = new ArrayList<>();
                     //retrieve types of expressions joined/selected with an indexed field
                     for (int j = 0; j < optFuncExpr.getNumLogicalVars(); j++) {
                         if (j != exprAndVarIdx.second) {
-                            indexedTypes.add(optFuncExpr.getFieldType(j));
+                            matchedTypes.add(optFuncExpr.getFieldType(j));
                         }
                     }
 
-                    //add constants in case of select
-                    if (indexedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1
-                            && optFuncExpr.getNumConstantAtRuntimeExpr() > 0) {
-                        indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
+                    if (matchedTypes.size() < 2 && optFuncExpr.getNumLogicalVars() == 1) {
+                        matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
                                 optFuncExpr.getConstantAtRuntimeExpr(0), context.getMetadataProvider(),
                                 typeEnvironment));
                     }
 
                     //infer type of logicalExpr based on index keyType
-                    indexedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
+                    matchedTypes.add((IAType) AqlExpressionTypeComputer.INSTANCE.getType(
                             optFuncExpr.getLogicalExpr(exprAndVarIdx.second), null, new IVariableTypeEnvironment() {
 
                                 @Override
@@ -257,7 +254,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                                 @Override
                                 public Object getVarType(LogicalVariable var, List<LogicalVariable> nonNullVariables,
                                         List<List<LogicalVariable>> correlatedNullableVariableLists)
-                                        throws AlgebricksException {
+                                                throws AlgebricksException {
                                     if (var.equals(optFuncExpr.getSourceVar(exprAndVarIdx.second))) {
                                         return keyType;
                                     }
@@ -285,16 +282,16 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                     boolean jaccardSimilarity = optFuncExpr.getFuncExpr().getFunctionIdentifier().getName()
                             .startsWith("similarity-jaccard-check");
 
-                    for (int j = 0; j < indexedTypes.size(); j++) {
-                        for (int k = j + 1; k < indexedTypes.size(); k++) {
-                            typeMatch &= isMatched(indexedTypes.get(j), indexedTypes.get(k), jaccardSimilarity);
+                    for (int j = 0; j < matchedTypes.size(); j++) {
+                        for (int k = j + 1; k < matchedTypes.size(); k++) {
+                            typeMatch &= isMatched(matchedTypes.get(j), matchedTypes.get(k), jaccardSimilarity);
                         }
                     }
 
                     // Check if any field name in the optFuncExpr matches.
                     if (optFuncExpr.findFieldName(keyField) != -1) {
-                        foundKeyField =
-                                typeMatch && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan();
+                        foundKeyField = typeMatch
+                                && optFuncExpr.getOperatorSubTree(exprAndVarIdx.second).hasDataSourceScan();
                         if (foundKeyField) {
                             matchedExpressions.add(exprAndVarIdx.first);
                             numMatchedKeys++;
@@ -369,8 +366,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                 continue;
             }
             AbstractFunctionCallExpression argFuncExpr = (AbstractFunctionCallExpression) argExpr;
-            boolean matchFound =
-                    analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context, typeEnvironment);
+            boolean matchFound = analyzeFunctionExpr(argFuncExpr, assignsAndUnnests, analyzedAMs, context,
+                    typeEnvironment);
             found = found || matchFound;
         }
         return found;
@@ -435,14 +432,13 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
     protected boolean fillIndexExprs(List<Index> datasetIndexes, List<String> fieldName, IAType fieldType,
             IOptimizableFuncExpr optFuncExpr, int matchedFuncExprIndex, int varIdx,
             OptimizableOperatorSubTree matchedSubTree, AccessMethodAnalysisContext analysisCtx)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         List<Index> indexCandidates = new ArrayList<Index>();
         // Add an index to the candidates if one of the indexed fields is
         // fieldName
         for (Index index : datasetIndexes) {
             // Need to also verify the index is pending no op
-            if (index.getKeyFieldNames().contains(fieldName)
-                    && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
+            if (index.getKeyFieldNames().contains(fieldName) && index.getPendingOp() == IMetadataEntity.PENDING_NO_OP) {
                 indexCandidates.add(index);
                 if (optFuncExpr.getFieldType(varIdx) == BuiltinType.AMISSING
                         || optFuncExpr.getFieldType(varIdx) == BuiltinType.ANY) {
@@ -540,8 +536,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                 return;
             }
         }
-        IAType fieldType =
-                (IAType) context.getOutputTypeEnvironment(unnestOp).getType(optFuncExpr.getLogicalExpr(funcVarIndex));
+        IAType fieldType = (IAType) context.getOutputTypeEnvironment(unnestOp)
+                .getType(optFuncExpr.getLogicalExpr(funcVarIndex));
         // Set the fieldName in the corresponding matched function
         // expression.
         optFuncExpr.setFieldName(funcVarIndex, fieldName);
@@ -571,16 +567,14 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
             // Remember matching subtree.
             optFuncExpr.setOptimizableSubTree(optVarIndex, subTree);
 
-            List<String> fieldName =
-                    getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
-                            subTree.getRecordType(), optVarIndex,
-                            optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(),
-                            datasetRecordVar, subTree.getMetaRecordType(), datasetMetaVar);
+            List<String> fieldName = getFieldNameFromSubTree(optFuncExpr, subTree, assignOrUnnestIndex, varIndex,
+                    subTree.getRecordType(), optVarIndex,
+                    optFuncExpr.getFuncExpr().getArguments().get(optVarIndex).getValue(), datasetRecordVar,
+                    subTree.getMetaRecordType(), datasetMetaVar);
             if (fieldName == null) {
                 continue;
             }
-            IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp)
-                    .getType(optFuncExpr.getLogicalExpr(optVarIndex));
+            IAType fieldType = (IAType) context.getOutputTypeEnvironment(assignOp).getVarType(var);
             // Set the fieldName in the corresponding matched
             // function expression.
             optFuncExpr.setFieldName(optVarIndex, fieldName);
@@ -597,7 +591,7 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
     private void matchVarsFromOptFuncExprToDataSourceScan(IOptimizableFuncExpr optFuncExpr, int optFuncExprIndex,
             List<Index> datasetIndexes, List<LogicalVariable> dsVarList, OptimizableOperatorSubTree subTree,
             AccessMethodAnalysisContext analysisCtx, IOptimizationContext context, boolean fromAdditionalDataSource)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         for (int varIndex = 0; varIndex < dsVarList.size(); varIndex++) {
             LogicalVariable var = dsVarList.get(varIndex);
             int funcVarIndex = optFuncExpr.findLogicalVar(var);
@@ -615,16 +609,15 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                 // Check whether this variable is PK, not a record variable.
                 if (varIndex <= subTreePKs.size() - 1) {
                     fieldName = subTreePKs.get(varIndex);
-                    fieldType =
-                            (IAType) context.getOutputTypeEnvironment(
-                                    subTree.getDataSourceRef().getValue()).getVarType(var);
+                    fieldType = (IAType) context.getOutputTypeEnvironment(subTree.getDataSourceRef().getValue())
+                            .getVarType(var);
                 }
             } else {
                 // Need to check additional dataset one by one
                 for (int i = 0; i < subTree.getIxJoinOuterAdditionalDatasets().size(); i++) {
                     if (subTree.getIxJoinOuterAdditionalDatasets().get(i) != null) {
-                        subTreePKs = DatasetUtils.getPartitioningKeys(
-                                subTree.getIxJoinOuterAdditionalDatasets().get(i));
+                        subTreePKs = DatasetUtils
+                                .getPartitioningKeys(subTree.getIxJoinOuterAdditionalDatasets().get(i));
 
                         // Check whether this variable is PK, not a record variable.
                         if (subTreePKs.contains(var) && varIndex <= subTreePKs.size() - 1) {
@@ -667,11 +660,10 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
      *
      * @throws AlgebricksException
      */
-    protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr,
-            OptimizableOperatorSubTree subTree, int opIndex, int assignVarIndex, ARecordType recordType,
-            int funcVarIndex, ILogicalExpression parentFuncExpr, LogicalVariable recordVar,
-            ARecordType metaType, LogicalVariable metaVar)
-            throws AlgebricksException {
+    protected List<String> getFieldNameFromSubTree(IOptimizableFuncExpr optFuncExpr, OptimizableOperatorSubTree subTree,
+            int opIndex, int assignVarIndex, ARecordType recordType, int funcVarIndex,
+            ILogicalExpression parentFuncExpr, LogicalVariable recordVar, ARecordType metaType, LogicalVariable metaVar)
+                    throws AlgebricksException {
         // Get expression corresponding to opVar at varIndex.
         AbstractLogicalExpression expr = null;
         AbstractFunctionCallExpression childFuncExpr = null;
@@ -679,6 +671,10 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
         if (op.getOperatorTag() == LogicalOperatorTag.ASSIGN) {
             AssignOperator assignOp = (AssignOperator) op;
             expr = (AbstractLogicalExpression) assignOp.getExpressions().get(assignVarIndex).getValue();
+            if (expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
+                //Otherwise the cast for childFuncExpr would fail
+                return null;
+            }
             childFuncExpr = (AbstractFunctionCallExpression) expr;
         } else {
             UnnestOperator unnestOp = (UnnestOperator) op;
@@ -723,8 +719,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
                 return null;
             }
             ConstantExpression constExpr = (ConstantExpression) nameArg;
-            AOrderedList orderedNestedFieldName =
-                    (AOrderedList) ((AsterixConstantValue) constExpr.getValue()).getObject();
+            AOrderedList orderedNestedFieldName = (AOrderedList) ((AsterixConstantValue) constExpr.getValue())
+                    .getObject();
             nestedAccessFieldName = new ArrayList<String>();
             for (int i = 0; i < orderedNestedFieldName.size(); i++) {
                 nestedAccessFieldName.add(((AString) orderedNestedFieldName.getItem(i)).getStringValue());
@@ -733,8 +729,8 @@ public abstract class AbstractIntroduceAccessMethodRule implements IAlgebraicRew
             isByName = true;
         }
         if (isFieldAccess) {
-            LogicalVariable sourceVar =
-                    ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue()).getVariableReference();
+            LogicalVariable sourceVar = ((VariableReferenceExpression) funcExpr.getArguments().get(0).getValue())
+                    .getVariableReference();
             optFuncExpr.setLogicalExpr(funcVarIndex, parentFuncExpr);
             int[] assignAndExpressionIndexes = null;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 874cc7c..23e45c4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -56,7 +56,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -490,7 +490,7 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index eeb2c2a..d3a0c0f 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -41,7 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -255,7 +255,7 @@ class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisit
     }
 
     @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ccf0aeb..44bfbe4 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceSc
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
@@ -143,7 +143,7 @@ class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Bool
     }
 
     @Override
-    public Boolean visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+    public Boolean visitDelegateOperator(DelegateOperator op, Void arg) throws AlgebricksException {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index b184774..98c717c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -295,12 +295,17 @@ public class CompiledStatements {
         private final String datasetName;
         private final Query query;
         private final int varCounter;
+        VariableExpr var;
+        Query returnQuery;
 
-        public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter) {
+        public CompiledInsertStatement(String dataverseName, String datasetName, Query query, int varCounter,
+                VariableExpr var, Query returnQuery) {
             this.dataverseName = dataverseName;
             this.datasetName = datasetName;
             this.query = query;
             this.varCounter = varCounter;
+            this.var = var;
+            this.returnQuery = returnQuery;
         }
 
         @Override
@@ -321,6 +326,14 @@ public class CompiledStatements {
             return query;
         }
 
+        public VariableExpr getVar() {
+            return var;
+        }
+
+        public Query getReturnQuery() {
+            return returnQuery;
+        }
+
         @Override
         public byte getKind() {
             return Statement.Kind.INSERT;
@@ -329,8 +342,9 @@ public class CompiledStatements {
 
     public static class CompiledUpsertStatement extends CompiledInsertStatement {
 
-        public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter) {
-            super(dataverseName, datasetName, query, varCounter);
+        public CompiledUpsertStatement(String dataverseName, String datasetName, Query query, int varCounter,
+                VariableExpr var, Query returnQuery) {
+            super(dataverseName, datasetName, query, varCounter, var, returnQuery);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 1b528b9..149656a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -116,6 +116,8 @@ public interface IStatementExecutor {
      * @param dmlStatement
      *            The data modification statement when the query results in a modification to a dataset
      * @return the compiled {@code JobSpecification}
+     * @param returnQuery
+     *            In the case of dml, the user may run a query on affected data
      * @throws AsterixException
      * @throws RemoteException
      * @throws AlgebricksException
@@ -124,7 +126,7 @@ public interface IStatementExecutor {
      */
     JobSpecification rewriteCompileQuery(AqlMetadataProvider metadataProvider, Query query,
             ICompiledDmlStatement dmlStatement)
-            throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
+                    throws AsterixException, RemoteException, AlgebricksException, JSONException, ACIDException;
 
     /**
      * returns the active dataverse for an entity or a statement

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 09a0476..9879da8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -27,11 +27,12 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.asterix.algebra.base.ILangExpressionToPlanTranslator;
+import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -40,15 +41,15 @@ import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.lang.aql.util.RangeMapBuilder;
 import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.base.Expression.Kind;
 import org.apache.asterix.lang.common.base.ILangExpression;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.base.Expression.Kind;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.clause.LimitClause;
 import org.apache.asterix.lang.common.clause.OrderbyClause;
-import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.clause.OrderbyClause.OrderModifier;
+import org.apache.asterix.lang.common.clause.WhereClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.FieldAccessor;
 import org.apache.asterix.lang.common.expression.FieldBinding;
@@ -56,14 +57,14 @@ import org.apache.asterix.lang.common.expression.GbyVariableExpressionPair;
 import org.apache.asterix.lang.common.expression.IfExpr;
 import org.apache.asterix.lang.common.expression.IndexAccessor;
 import org.apache.asterix.lang.common.expression.ListConstructor;
+import org.apache.asterix.lang.common.expression.ListConstructor.Type;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.OperatorExpr;
 import org.apache.asterix.lang.common.expression.QuantifiedExpression;
+import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier;
 import org.apache.asterix.lang.common.expression.RecordConstructor;
 import org.apache.asterix.lang.common.expression.UnaryExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
-import org.apache.asterix.lang.common.expression.ListConstructor.Type;
-import org.apache.asterix.lang.common.expression.QuantifiedExpression.Quantifier;
 import org.apache.asterix.lang.common.literal.StringLiteral;
 import org.apache.asterix.lang.common.statement.FunctionDecl;
 import org.apache.asterix.lang.common.statement.Query;
@@ -74,13 +75,13 @@ import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.lang.common.visitor.base.AbstractQueryExpressionVisitor;
 import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.declared.AqlMetadataProvider;
 import org.apache.asterix.metadata.declared.AqlSourceId;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.LoadableDataSource;
 import org.apache.asterix.metadata.declared.ResultSetDataSink;
 import org.apache.asterix.metadata.declared.ResultSetSinkId;
-import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.Function;
@@ -96,8 +97,10 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.util.AsterixAppContextInfo;
+import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
 import org.apache.asterix.translator.CompiledStatements.CompiledSubscribeFeedStatement;
+import org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
 import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.util.FunctionCollection;
 import org.apache.asterix.translator.util.PlanTranslationUtil;
@@ -116,15 +119,15 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
 import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation;
+import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
-import org.apache.hyracks.algebricks.core.algebra.expressions.BroadcastExpressionAnnotation.BroadcastSide;
 import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
@@ -133,6 +136,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOper
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
@@ -140,13 +144,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -286,15 +290,30 @@ class LangExpressionToPlanTranslator
     @Override
     public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt)
             throws AlgebricksException {
-        Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this,
-                new MutableObject<>(new EmptyTupleSourceOperator()));
+        return translate(expr, outputDatasetName, stmt, null);
+    }
+
+    public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
+            ILogicalOperator baseOp) throws AlgebricksException {
+        MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
+        if (baseOp != null) {
+            base = new MutableObject<>(baseOp);
+        }
+        Pair<ILogicalOperator, LogicalVariable> p = expr.accept(this, base);
         ArrayList<Mutable<ILogicalOperator>> globalPlanRoots = new ArrayList<>();
         ILogicalOperator topOp = p.first;
-        ProjectOperator project = (ProjectOperator) topOp;
-        LogicalVariable unnestVar = project.getVariables().get(0);
-        LogicalVariable resVar = project.getVariables().get(0);
 
         if (outputDatasetName == null) {
+            LogicalVariable resVar;
+            if (topOp instanceof ProjectOperator) {
+                resVar = ((ProjectOperator) topOp).getVariables().get(0);
+            } else if (topOp instanceof AssignOperator) {
+                resVar = ((AssignOperator) topOp).getVariables().get(0);
+            } else if (topOp instanceof AggregateOperator) {
+                resVar = ((AggregateOperator) topOp).getVariables().get(0);
+            } else {
+                throw new AlgebricksException("Invalid returning query");
+            }
             FileSplit outputFileSplit = metadataProvider.getOutputFile();
             if (outputFileSplit == null) {
                 outputFileSplit = getDefaultOutputFileLocation();
@@ -305,8 +324,9 @@ class LangExpressionToPlanTranslator
             writeExprList.add(new MutableObject<>(new VariableReferenceExpression(resVar)));
             ResultSetSinkId rssId = new ResultSetSinkId(metadataProvider.getResultSetId());
             ResultSetDataSink sink = new ResultSetDataSink(rssId, null);
-            topOp = new DistributeResultOperator(writeExprList, sink);
-            topOp.getInputs().add(new MutableObject<>(project));
+            DistributeResultOperator newTop = new DistributeResultOperator(writeExprList, sink);
+            newTop.getInputs().add(new MutableObject<>(topOp));
+            topOp = newTop;
 
             // Retrieve the Output RecordType (if any) and store it on
             // the DistributeResultOperator
@@ -315,6 +335,10 @@ class LangExpressionToPlanTranslator
                 topOp.getAnnotations().put("output-record-type", outputRecordType);
             }
         } else {
+            ProjectOperator project = (ProjectOperator) topOp;
+            LogicalVariable unnestVar = project.getVariables().get(0);
+            LogicalVariable resVar = project.getVariables().get(0);
+
             /**
              * add the collection-to-sequence right before the project,
              * because dataset only accept non-collection records
@@ -380,12 +404,12 @@ class LangExpressionToPlanTranslator
             switch (stmt.getKind()) {
                 case Statement.Kind.INSERT:
                     leafOperator = translateInsert(targetDatasource, varRef, varRefsForLoading,
-                            additionalFilteringExpressions, assign);
+                            additionalFilteringExpressions, assign, stmt);
                     break;
                 case Statement.Kind.UPSERT:
                     leafOperator = translateUpsert(targetDatasource, varRef, varRefsForLoading,
                             additionalFilteringExpressions, assign, additionalFilteringField, unnestVar, project, exprs,
-                            resVar, additionalFilteringAssign);
+                            resVar, additionalFilteringAssign, stmt);
                     break;
                 case Statement.Kind.DELETE:
                     leafOperator = translateDelete(targetDatasource, varRef, varRefsForLoading,
@@ -418,7 +442,7 @@ class LangExpressionToPlanTranslator
                 varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
         insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         insertOp.getInputs().add(new MutableObject<>(assign));
-        SinkOperator leafOperator = new SinkOperator();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(insertOp));
         return leafOperator;
     }
@@ -426,7 +450,7 @@ class LangExpressionToPlanTranslator
     private ILogicalOperator translateDelete(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         if (targetDatasource.getDataset().hasMetaPart()) {
             throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
                     + ": delete from dataset is not supported on Datasets with Meta records");
@@ -435,7 +459,7 @@ class LangExpressionToPlanTranslator
                 varRefsForLoading, InsertDeleteUpsertOperator.Kind.DELETE, false);
         deleteOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
         deleteOp.getInputs().add(new MutableObject<>(assign));
-        SinkOperator leafOperator = new SinkOperator();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(deleteOp));
         return leafOperator;
     }
@@ -528,7 +552,7 @@ class LangExpressionToPlanTranslator
             project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
         }
         feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        SinkOperator leafOperator = new SinkOperator();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(true));
         leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
         return leafOperator;
     }
@@ -537,14 +561,20 @@ class LangExpressionToPlanTranslator
             List<Mutable<ILogicalExpression>> varRefsForLoading,
             List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
             List<String> additionalFilteringField, LogicalVariable unnestVar, ProjectOperator project,
-            List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign)
-            throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> exprs, LogicalVariable resVar, AssignOperator additionalFilteringAssign,
+            ICompiledDmlStatement stmt) throws AlgebricksException {
         if (!targetDatasource.getDataset().allow(project, Dataset.OP_UPSERT)) {
             throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
                     + ": upsert into dataset is not supported on Datasets with Meta records");
         }
+        CompiledUpsertStatement compiledUpsert = (CompiledUpsertStatement) stmt;
+        InsertDeleteUpsertOperator upsertOp;
+        ILogicalOperator leafOperator;
         if (targetDatasource.getDataset().hasMetaPart()) {
-            InsertDeleteUpsertOperator feedModificationOp;
+            if (compiledUpsert.getReturnQuery() != null) {
+                throw new AlgebricksException("Returning not allowed on datasets with Meta records");
+
+            }
             AssignOperator metaAndKeysAssign;
             List<LogicalVariable> metaAndKeysVars;
             List<Mutable<ILogicalExpression>> metaAndKeysExprs;
@@ -575,71 +605,113 @@ class LangExpressionToPlanTranslator
                 }
             }
             // A change feed, we don't need the assign to access PKs
-            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
-                    metaExpSingletonList, InsertDeleteUpsertOperator.Kind.UPSERT, false);
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading, metaExpSingletonList,
+                    InsertDeleteUpsertOperator.Kind.UPSERT, false);
             // Create and add a new variable used for representing the original record
-            feedModificationOp.setPrevRecordVar(context.newVar());
-            feedModificationOp.setPrevRecordType(targetDatasource.getItemType());
+            upsertOp.setPrevRecordVar(context.newVar());
+            upsertOp.setPrevRecordType(targetDatasource.getItemType());
             if (targetDatasource.getDataset().hasMetaPart()) {
                 List<LogicalVariable> metaVars = new ArrayList<>();
                 metaVars.add(context.newVar());
-                feedModificationOp.setPrevAdditionalNonFilteringVars(metaVars);
+                upsertOp.setPrevAdditionalNonFilteringVars(metaVars);
                 List<Object> metaTypes = new ArrayList<>();
                 metaTypes.add(targetDatasource.getMetaItemType());
-                feedModificationOp.setPrevAdditionalNonFilteringTypes(metaTypes);
+                upsertOp.setPrevAdditionalNonFilteringTypes(metaTypes);
             }
 
             if (additionalFilteringField != null) {
-                feedModificationOp.setPrevFilterVar(context.newVar());
-                feedModificationOp.setPrevFilterType(
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(
                         ((ARecordType) targetDatasource.getItemType()).getFieldType(additionalFilteringField.get(0)));
                 additionalFilteringAssign.getInputs().clear();
                 additionalFilteringAssign.getInputs().add(assign.getInputs().get(0));
-                feedModificationOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
+                upsertOp.getInputs().add(new MutableObject<>(additionalFilteringAssign));
             } else {
-                feedModificationOp.getInputs().add(assign.getInputs().get(0));
+                upsertOp.getInputs().add(assign.getInputs().get(0));
             }
             metaAndKeysAssign = new AssignOperator(metaAndKeysVars, metaAndKeysExprs);
             metaAndKeysAssign.getInputs().add(project.getInputs().get(0));
             project.getInputs().set(0, new MutableObject<>(metaAndKeysAssign));
-            feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-            SinkOperator leafOperator = new SinkOperator();
-            leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
-            return leafOperator;
+            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+            leafOperator = new DelegateOperator(new CommitOperator(true));
+            leafOperator.getInputs().add(new MutableObject<>(upsertOp));
+
         } else {
-            InsertDeleteUpsertOperator feedModificationOp;
-            feedModificationOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
+            upsertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef, varRefsForLoading,
                     InsertDeleteUpsertOperator.Kind.UPSERT, false);
-            feedModificationOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-            feedModificationOp.getInputs().add(new MutableObject<>(assign));
+            upsertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
+            upsertOp.getInputs().add(new MutableObject<>(assign));
             // Create and add a new variable used for representing the original record
             ARecordType recordType = (ARecordType) targetDatasource.getItemType();
-            feedModificationOp.setPrevRecordVar(context.newVar());
-            feedModificationOp.setPrevRecordType(recordType);
+            upsertOp.setPrevRecordVar(context.newVar());
+            upsertOp.setPrevRecordType(recordType);
             if (additionalFilteringField != null) {
-                feedModificationOp.setPrevFilterVar(context.newVar());
-                feedModificationOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+                upsertOp.setPrevFilterVar(context.newVar());
+                upsertOp.setPrevFilterType(recordType.getFieldType(additionalFilteringField.get(0)));
+            }
+
+            if (compiledUpsert.getReturnQuery() != null) {
+                leafOperator = createReturningQuery(compiledUpsert, upsertOp);
+
+            } else {
+                leafOperator = new DelegateOperator(new CommitOperator(true));
+                leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(upsertOp));
             }
-            SinkOperator leafOperator = new SinkOperator();
-            leafOperator.getInputs().add(new MutableObject<>(feedModificationOp));
-            return leafOperator;
         }
+        return leafOperator;
+
+    }
+
+    private ILogicalOperator createReturningQuery(CompiledInsertStatement compiledInsert,
+            InsertDeleteUpsertOperator insertOp) throws AlgebricksException {
+        //Make the id of the insert var point to the record variable
+        context.newVar(compiledInsert.getVar());
+        context.setVar(compiledInsert.getVar(),
+                ((VariableReferenceExpression) insertOp.getPayloadExpression().getValue()).getVariableReference());
+        // context
+
+        ILogicalPlan planAfterInsert = translate(compiledInsert.getReturnQuery(), null, null, insertOp);
+
+        ILogicalOperator finalRoot = planAfterInsert.getRoots().get(0).getValue();
+        ILogicalOperator op;
+        for (op = finalRoot;; op = op.getInputs().get(0).getValue()) {
+            if (op.getInputs().size() != 1) {
+                throw new AlgebricksException("Cannot have a multi-branch returning query");
+            }
+            if (op.getInputs().get(0).getValue() instanceof InsertDeleteUpsertOperator) {
+                break;
+            }
+        }
+
+        op.getInputs().clear();
+        ILogicalOperator leafOperator = new DelegateOperator(new CommitOperator(false));
+        leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+        op.getInputs().add(new MutableObject<>(leafOperator));
+        leafOperator = finalRoot;
+        return leafOperator;
     }
 
     private ILogicalOperator translateInsert(DatasetDataSource targetDatasource, Mutable<ILogicalExpression> varRef,
             List<Mutable<ILogicalExpression>> varRefsForLoading,
-            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign)
-            throws AlgebricksException {
+            List<Mutable<ILogicalExpression>> additionalFilteringExpressions, ILogicalOperator assign,
+            ICompiledDmlStatement stmt) throws AlgebricksException {
         if (targetDatasource.getDataset().hasMetaPart()) {
             throw new AlgebricksException(targetDatasource.getDataset().getDatasetName()
                     + ": insert into dataset is not supported on Datasets with Meta records");
         }
+        ILogicalOperator leafOperator;
         InsertDeleteUpsertOperator insertOp = new InsertDeleteUpsertOperator(targetDatasource, varRef,
                 varRefsForLoading, InsertDeleteUpsertOperator.Kind.INSERT, false);
         insertOp.setAdditionalFilteringExpressions(additionalFilteringExpressions);
-        insertOp.getInputs().add(new MutableObject<>(assign));
-        SinkOperator leafOperator = new SinkOperator();
-        leafOperator.getInputs().add(new MutableObject<>(insertOp));
+        insertOp.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+        CompiledInsertStatement compiledInsert = (CompiledInsertStatement) stmt;
+        if (compiledInsert.getReturnQuery() != null) {
+            leafOperator = createReturningQuery(compiledInsert, insertOp);
+
+        } else {
+            leafOperator = new DelegateOperator(new CommitOperator(true));
+            leafOperator.getInputs().add(new MutableObject<ILogicalOperator>(insertOp));
+        }
         return leafOperator;
     }
 
@@ -880,15 +952,15 @@ class LangExpressionToPlanTranslator
 
         gOp.getInputs().add(topOp);
         for (Entry<Expression, VariableExpr> entry : gc.getWithVarMap().entrySet()) {
-            Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(
-                        entry.getKey(), new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
-                List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
+            Pair<ILogicalExpression, Mutable<ILogicalOperator>> listifyInput = langExprToAlgExpression(entry.getKey(),
+                    new MutableObject<>(new NestedTupleSourceOperator(new MutableObject<>(gOp))));
+            List<Mutable<ILogicalExpression>> flArgs = new ArrayList<>(1);
             flArgs.add(new MutableObject<>(listifyInput.first));
             AggregateFunctionCallExpression fListify = AsterixBuiltinFunctions
-                        .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
+                    .makeAggregateFunctionExpression(AsterixBuiltinFunctions.LISTIFY, flArgs);
             LogicalVariable aggVar = context.newVar();
             AggregateOperator agg = new AggregateOperator(mkSingletonArrayList(aggVar),
-                        mkSingletonArrayList(new MutableObject<>(fListify)));
+                    mkSingletonArrayList(new MutableObject<>(fListify)));
 
             agg.getInputs().add(listifyInput.second);
 
@@ -945,8 +1017,8 @@ class LangExpressionToPlanTranslator
         LogicalVariable unnestVar = context.newVar();
         UnnestOperator unnestOp = new UnnestOperator(unnestVar,
                 new MutableObject<>(new UnnestingFunctionCallExpression(
-                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), Collections
-                                .singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION),
+                        Collections.singletonList(new MutableObject<>(new VariableReferenceExpression(selectVar))))));
         unnestOp.getInputs().add(new MutableObject<>(assignOp));
 
         // Produces the final result.
@@ -1514,7 +1586,7 @@ class LangExpressionToPlanTranslator
                     // There is a shared operator reference in the query plan.
                     // Deep copies the child plan.
                     LogicalOperatorDeepCopyWithNewVariablesVisitor visitor =
-                            new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
+                        new LogicalOperatorDeepCopyWithNewVariablesVisitor(context, null);
                     ILogicalOperator newChild = childRef.getValue().accept(visitor, null);
                     LinkedHashMap<LogicalVariable, LogicalVariable> cloneVarMap = visitor
                             .getInputToOutputVariableMapping();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/afa909a5/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
index 78c68e1..6c8019d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/FeedServlet.java
@@ -25,21 +25,14 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.util.Collection;
 
 import javax.imageio.ImageIO;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.watch.FeedActivity;
-import org.apache.asterix.external.feed.watch.FeedActivity.FeedActivityDetails;
-
 public class FeedServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
-    private static final String FEED_EXTENSION_NAME = "Feed";
 
     @Override
     public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {
@@ -89,49 +82,4 @@ public class FeedServlet extends HttpServlet {
         PrintWriter out = response.getWriter();
         out.println(outStr);
     }
-
-    @SuppressWarnings("unused")
-    private void insertTable(StringBuilder html, Collection<FeedActivity> list) {
-    }
-
-    @SuppressWarnings("null")
-    private void insertRow(StringBuilder html, FeedActivity activity) {
-        String intake = activity.getFeedActivityDetails().get(FeedActivityDetails.INTAKE_LOCATIONS);
-        String compute = activity.getFeedActivityDetails().get(FeedActivityDetails.COMPUTE_LOCATIONS);
-        String store = activity.getFeedActivityDetails().get(FeedActivityDetails.STORAGE_LOCATIONS);
-
-        FeedConnectionId connectionId = new FeedConnectionId(
-                new EntityId(FEED_EXTENSION_NAME, activity.getDataverseName(), activity.getFeedName()),
-                activity.getDatasetName());
-        int intakeRate = 0;
-        int storeRate = 0;
-
-        html.append("<tr>");
-        html.append("<td>" + activity.getFeedName() + "</td>");
-        html.append("<td>" + activity.getDatasetName() + "</td>");
-        html.append("<td>" + activity.getConnectTimestamp() + "</td>");
-        //html.append("<td>" + insertLink(html, FeedDashboardServlet.getParameterizedURL(activity), "Details") + "</td>");
-        html.append("<td>" + intake + "</td>");
-        html.append("<td>" + compute + "</td>");
-        html.append("<td>" + store + "</td>");
-        String color = "black";
-        if (intakeRate > storeRate) {
-            color = "red";
-        }
-        if (intakeRate < 0) {
-            html.append("<td>" + "UNKNOWN" + "</td>");
-        } else {
-            html.append("<td>" + insertColoredText("" + intakeRate, color) + " rec/sec" + "</td>");
-        }
-        if (storeRate < 0) {
-            html.append("<td>" + "UNKNOWN" + "</td>");
-        } else {
-            html.append("<td>" + insertColoredText("" + storeRate, color) + " rec/sec" + "</td>");
-        }
-        html.append("</tr>");
-    }
-
-    private String insertColoredText(String s, String color) {
-        return "<font color=\"" + color + "\">" + s + "</font>";
-    }
 }