You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/03/08 20:53:16 UTC

[1/3] asterixdb git commit: Cleanup and bug fixes in Feeds pipeline

Repository: asterixdb
Updated Branches:
  refs/heads/master 31d8102aa -> 8c427cd4b


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 8f005d8..77020f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -22,8 +22,8 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -36,24 +36,24 @@ import org.apache.hyracks.util.IntSerDeUtils;
  * This appender must only be used on network boundary
  */
 public class MessagingFrameTupleAppender extends FrameTupleAppender {
-
-    private final IHyracksTaskContext ctx;
-    private static final int NULL_MESSAGE_SIZE = 1;
+    public static final int NULL_MESSAGE_SIZE = 1;
     public static final byte NULL_FEED_MESSAGE = 0x01;
     public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
     public static final byte MARKER_MESSAGE = 0x03;
+
+    private final IHyracksTaskContext ctx;
     private boolean initialized = false;
-    private VSizeFrame message;
+    private IFrame message;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
     }
 
-    public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException {
+    public static void printMessage(IFrame message, PrintStream out) throws HyracksDataException {
         out.println(getMessageString(message));
     }
 
-    public static String getMessageString(VSizeFrame message) throws HyracksDataException {
+    public static String getMessageString(IFrame message) throws HyracksDataException {
         StringBuilder aString = new StringBuilder();
         aString.append("Message Type: ");
         switch (getMessageType(message)) {
@@ -76,7 +76,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         return aString.toString();
     }
 
-    public static byte getMessageType(VSizeFrame message) throws HyracksDataException {
+    public static byte getMessageType(IFrame message) throws HyracksDataException {
         switch (message.getBuffer().array()[0]) {
             case NULL_FEED_MESSAGE:
                 return NULL_FEED_MESSAGE;
@@ -105,15 +105,13 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
 
     @Override
     public int getTupleCount() {
-        // if message is set, there is always a message. that message could be a null message (TODO: optimize)
-        return tupleCount + ((message == null) ? 0 : 1);
+        return tupleCount + 1;
     }
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         if (!initialized) {
-            message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
-            initialized = true;
+            init();
         }
         // If message fits, we append it, otherwise, we append a null message, then send a message only
         // frame with the message
@@ -125,7 +123,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         } else {
             ByteBuffer buffer = message.getBuffer();
             int messageSize = buffer.limit() - buffer.position();
-            if (hasEnoughSpace(1, messageSize)) {
+            if (hasEnoughSpace(0, messageSize)) {
                 appendMessage(buffer);
                 forward(outWriter);
             } else {
@@ -133,7 +131,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
                     appendNullMessage();
                     forward(outWriter);
                 }
-                if (!hasEnoughSpace(1, messageSize)) {
+                if (!hasEnoughSpace(0, messageSize)) {
                     frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
                     reset(frame.getBuffer(), true);
                 }
@@ -143,6 +141,11 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         }
     }
 
+    private void init() {
+        message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+        initialized = true;
+    }
+
     private void forward(IFrameWriter outWriter) throws HyracksDataException {
         getBuffer().clear();
         outWriter.nextFrame(getBuffer());
@@ -168,4 +171,13 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
+
+    /*
+     * Always write and then flush to send out the message if exists
+     */
+    @Override
+    public void flush(IFrameWriter writer) throws HyracksDataException {
+        write(writer, true);
+        writer.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 6d87d89..dbd3afa 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -41,25 +41,30 @@ public class PartitionDataWriter implements IFrameWriter {
     private final ITuplePartitionComputer tpc;
     private final IHyracksTaskContext ctx;
     private boolean[] allocatedFrames;
+    private boolean failed = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
+        this.ctx = ctx;
+        this.tpc = tpc;
         this.consumerPartitionCount = consumerPartitionCount;
         pWriters = new IFrameWriter[consumerPartitionCount];
         isOpen = new boolean[consumerPartitionCount];
         allocatedFrames = new boolean[consumerPartitionCount];
         appenders = new FrameTupleAppender[consumerPartitionCount];
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        initializeAppenders(pwFactory);
+    }
+
+    protected void initializeAppenders(IPartitionWriterFactory pwFactory) throws HyracksDataException {
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
                 appenders[i] = createTupleAppender(ctx);
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
         }
-        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
-        this.tpc = tpc;
-        this.ctx = ctx;
     }
 
     protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
@@ -71,25 +76,17 @@ public class PartitionDataWriter implements IFrameWriter {
         HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
             if (isOpen[i]) {
-                if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
+                if (allocatedFrames[i] && appenders[i].getTupleCount() > 0 && !failed) {
                     try {
                         appenders[i].write(pWriters[i], true);
                     } catch (Throwable th) {
-                        if (closeException == null) {
-                            closeException = new HyracksDataException(th);
-                        } else {
-                            closeException.addSuppressed(th);
-                        }
+                        closeException = HyracksDataException.suppress(closeException, th);
                     }
                 }
                 try {
                     pWriters[i].close();
                 } catch (Throwable th) {
-                    if (closeException == null) {
-                        closeException = new HyracksDataException(th);
-                    } else {
-                        closeException.addSuppressed(th);
-                    }
+                    closeException = HyracksDataException.suppress(closeException, th);
                 }
             }
         }
@@ -126,17 +123,14 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         HyracksDataException failException = null;
         for (int i = 0; i < appenders.length; ++i) {
             if (isOpen[i]) {
                 try {
                     pWriters[i].fail();
                 } catch (Throwable th) {
-                    if (failException == null) {
-                        failException = new HyracksDataException(th);
-                    } else {
-                        failException.addSuppressed(th);
-                    }
+                    failException = HyracksDataException.suppress(failException, th);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
index a985b4d..b89922e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.storage.common.file.LocalResource;
 
 @FunctionalInterface
 public interface IModificationOperationCallbackFactory extends Serializable {
-    public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+    IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
             IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index 627994c..a19e69a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -23,10 +23,6 @@ import org.apache.hyracks.data.std.api.IValueReference;
 public class MutableArrayValueReference implements IValueReference {
     private byte[] array;
 
-    public MutableArrayValueReference() {
-        //mutable array. user doesn't need to specify the array in advance
-    }
-
     public MutableArrayValueReference(byte[] array) {
         this.array = array;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
new file mode 100644
index 0000000..de72690
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface that is used to enable frame level operation on indexes
+ */
+@FunctionalInterface
+public interface IFrameOperationCallback {
+    /**
+     * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in
+     * the pipeline
+     *
+     * @param modified
+     *            true if the index was modified during the processing of the frame, false otherwise
+     * @throws HyracksDataException
+     */
+    void frameCompleted(boolean modified) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
new file mode 100644
index 0000000..8031d32
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A factory for {@link IFrameOperationCallback}
+ */
+@FunctionalInterface
+public interface IFrameOperationCallbackFactory extends Serializable {
+    /**
+     * Create a {@link IFrameOperationCallback} for an index operator
+     *
+     * @param ctx
+     *            the task context
+     * @param indexAccessor
+     *            the accessor for the index
+     * @return an instance of {@link IFrameOperationCallback}
+     */
+    IFrameOperationCallback createFrameOperationCallback(IHyracksTaskContext ctx, ILSMIndexAccessor indexAccessor);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 7a2bc7c..f21c8a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.api;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -28,34 +29,174 @@ import org.apache.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMHarness {
 
+    /**
+     * Force modification even if memory component is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param tuple
+     *            the operation tuple
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Modify the index if the memory component is not full, wait for a new memory component if the current one is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param tryOperation
+     *            true if IO operation
+     * @param tuple
+     *            the operation tuple
+     * @return
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Search the index
+     *
+     * @param ctx
+     *            the search operation context
+     * @param cursor
+     *            the index cursor
+     * @param pred
+     *            the search predicate
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
             throws HyracksDataException, IndexException;
 
+    /**
+     * End the search
+     *
+     * @param ctx
+     * @throws HyracksDataException
+     */
     void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
+    /**
+     * Schedule a merge
+     *
+     * @param ctx
+     * @param callback
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule full merge
+     *
+     * @param ctx
+     * @param callback
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Perform a merge operation
+     *
+     * @param ctx
+     * @param operation
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule a flush
+     *
+     * @param ctx
+     * @param callback
+     * @throws HyracksDataException
+     */
     void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws HyracksDataException;
 
+    /**
+     * Perform a flush
+     *
+     * @param ctx
+     * @param operation
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException, IndexException;
 
+    /**
+     * Add bulk loaded component
+     *
+     * @param index
+     *            the new component
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException, IndexException;
 
+    /**
+     * Get index operation tracker
+     */
     ILSMOperationTracker getOperationTracker();
 
+    /**
+     * Schedule replication
+     *
+     * @param ctx
+     *            the operation context
+     * @param diskComponents
+     *            the disk component to be replicated
+     * @param bulkload
+     *            true if the components were bulk loaded, false otherwise
+     * @param opType
+     *            The operation type
+     * @throws HyracksDataException
+     */
     void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent> diskComponents, boolean bulkload,
             LSMOperationType opType) throws HyracksDataException;
 
+    /**
+     * End a replication operation
+     *
+     * @param ctx
+     *            the operation context
+     * @throws HyracksDataException
+     */
     void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
+    /**
+     * Update the metadata of the memory component of the index. Waiting for a new memory component if
+     * the current memory component is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param key
+     *            the meta key
+     * @param value
+     *            the meta value
+     * @throws HyracksDataException
+     */
+    void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+            throws HyracksDataException;
+
+    /**
+     * Force updating the metadata of the memory component of the index even if memory component is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param key
+     *            the meta key
+     * @param value
+     *            the meta value
+     * @throws HyracksDataException
+     */
+    void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+            throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index fecc674..90c70aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.api;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.IndexException;
@@ -34,16 +35,43 @@ import org.apache.hyracks.storage.am.common.api.TreeIndexException;
  * concurrent operations).
  */
 public interface ILSMIndexAccessor extends IIndexAccessor {
+    /**
+     * Schedule a flush operation
+     *
+     * @param callback
+     *            the IO operation callback
+     * @throws HyracksDataException
+     */
     void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
 
+    /**
+     * Schedule a merge operation
+     *
+     * @param callback
+     *            the merge operation callback
+     * @param components
+     *            the components to be merged
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule a full merge
+     *
+     * @param callback
+     *            the merge operation callback
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException, IndexException;
 
     /**
-     * Deletes the tuple from the memory component only.
+     * Delete the tuple from the memory component only. Don't replace with antimatter tuple
      *
+     * @param tuple
+     *            the tuple to be deleted
      * @throws HyracksDataException
      * @throws IndexException
      */
@@ -113,12 +141,49 @@ public interface ILSMIndexAccessor extends IIndexAccessor {
      */
     boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Delete the tuple from the memory component only. Don't replace with antimatter tuple
+     * Perform operation even if the memory component is full
+     *
+     * @param tuple
+     *            the tuple to delete
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Insert a new tuple (failing if duplicate key entry is found)
+     *
+     * @param tuple
+     *            the tuple to insert
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Force deleting an index entry even if the memory component is full
+     * replace the entry if found with an antimatter tuple, otherwise, simply insert the antimatter tuple
+     *
+     * @param tuple
+     *            tuple to delete
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule a replication for disk components
+     *
+     * @param diskComponents
+     *            the components to be replicated
+     * @param bulkload
+     *            true if the components were bulkloaded, false otherwise
+     * @param opType
+     *            the operation type
+     * @throws HyracksDataException
+     */
     void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType)
             throws HyracksDataException;
 
@@ -137,4 +202,24 @@ public interface ILSMIndexAccessor extends IIndexAccessor {
      * @throws TreeIndexException
      */
     void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+
+    /**
+     * Update the metadata of the memory component, wait for the new component if the current one is UNWRITABLE
+     *
+     * @param key
+     *            the key
+     * @param value
+     *            the value
+     * @throws HyracksDataException
+     */
+    void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
+
+    /**
+     * Force update the metadata of the current memory component even if it is UNWRITABLE
+     *
+     * @param key
+     * @param value
+     * @throws HyracksDataException
+     */
+    void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 6bf9312..01e85d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -28,16 +28,17 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -360,6 +361,44 @@ public class LSMHarness implements ILSMHarness {
         return modify(ctx, tryOperation, tuple, opType);
     }
 
+    @Override
+    public void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+            throws HyracksDataException {
+        if (!lsmIndex.isMemoryComponentsAllocated()) {
+            lsmIndex.allocateMemoryComponents();
+        }
+        getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+        try {
+            lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value);
+        } finally {
+            exitAndComplete(ctx, LSMOperationType.MODIFICATION);
+        }
+    }
+
+    private void exitAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException {
+        try {
+            exitComponents(ctx, op, null, false);
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        } finally {
+            opTracker.completeOperation(null, op, null, ctx.getModificationCallback());
+        }
+    }
+
+    @Override
+    public void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+            throws HyracksDataException {
+        if (!lsmIndex.isMemoryComponentsAllocated()) {
+            lsmIndex.allocateMemoryComponents();
+        }
+        getAndEnterComponents(ctx, LSMOperationType.FORCE_MODIFICATION, false);
+        try {
+            lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value);
+        } finally {
+            exitAndComplete(ctx, LSMOperationType.FORCE_MODIFICATION);
+        }
+    }
+
     private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple,
             LSMOperationType opType) throws HyracksDataException, IndexException {
         if (!lsmIndex.isMemoryComponentsAllocated()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 4199cfb..0fa69ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -164,4 +165,18 @@ public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor {
         ctx.setOperation(IndexOperation.DELETE);
         lsmHarness.forceModify(ctx, tuple);
     }
+
+    @Override
+    public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+     // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.updateMeta(ctx,key,value);
+    }
+
+    @Override
+    public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+        // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.forceUpdateMeta(ctx, key, value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index cef4257..0a6ffd7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -185,4 +186,18 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd
         throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm inverted index.");
     }
 
+    @Override
+    public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+        // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.updateMeta(ctx, key, value);
+    }
+
+    @Override
+    public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException {
+        // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.forceUpdateMeta(ctx, key, value);
+    }
+
 }


[2/3] asterixdb git commit: Cleanup and bug fixes in Feeds pipeline

Posted by am...@apache.org.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a89d13e..3b6e7ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -41,8 +41,6 @@ public class ExternalDataConstants {
     public static final String KEY_FILESYSTEM = "fs";
     // specifies the address of the HDFS name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
-    // specifies whether a feed sends progress markers or not
-    public static final String KEY_SEND_MARKER = "send-marker";
     // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d009960..a09ff9b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.util;
 
-import java.util.HashMap;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -162,7 +162,7 @@ public class ExternalDataUtils {
     private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
 
     private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+        Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
         m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
         m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
         m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
@@ -339,8 +339,4 @@ public class ExternalDataUtils {
         }
         return intIndicators;
     }
-
-    public static boolean isSendMarker(Map<String, String> configuration) {
-        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d286ff9..3863920 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -79,12 +79,10 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;
                 IPropertiesProvider propertiesProvider =
-                        (IPropertiesProvider) ((NodeControllerService) ctx
-                                .getJobletContext().getApplicationContext().getControllerService())
-                                        .getApplicationContext()
-                                        .getApplicationObject();
-                ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
-                        .get(nodeId)[0];
+                        (IPropertiesProvider) ((NodeControllerService) ctx.getJobletContext().getApplicationContext()
+                                .getControllerService()).getApplicationContext().getApplicationObject();
+                ClusterPartition nodePartition =
+                        propertiesProvider.getMetadataProperties().getNodePartitions().get(nodeId)[0];
                 parser = new ADMDataParser(outputType, true);
                 forwarder = DataflowUtils.getTupleForwarder(configuration,
                         FeedUtils.getFeedLogManager(ctx,
@@ -144,5 +142,4 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
     public ARecordType getMetaType() {
         return null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..6901e1d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -310,8 +310,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
-        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
-                : dataverse;
+        String dv =
+                dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse;
         if (dv == null) {
             return null;
         }
@@ -353,25 +353,15 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             throws AlgebricksException {
         DataSource source = findDataSource(dataSourceId);
         Dataset dataset = ((DatasetDataSource) source).getDataset();
-        try {
-            String indexName = indexId;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            if (secondaryIndex != null) {
-                return new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-            } else {
-                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), dataset.getDatasetName());
-                if (primaryIndex.getIndexName().equals(indexId)) {
-                    return new DataSourceIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(),
-                            this);
-                } else {
-                    return null;
-                }
-            }
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        String indexName = indexId;
+        Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        return (secondaryIndex != null)
+                ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
+                : null;
+    }
+
+    public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
     }
 
     public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
@@ -405,8 +395,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
             JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
             throws AlgebricksException {
-        ExternalScanOperatorDescriptor dataScanner =
-                new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
         AlgebricksPartitionConstraint constraint;
         try {
             constraint = adapterFactory.getPartitionConstraint();
@@ -462,8 +451,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
             boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes,
-            int[] maxFilterFieldIndexes) throws AlgebricksException {
+            boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes)
+            throws AlgebricksException {
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
@@ -530,8 +519,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), indexName,
-                    temp);
+            spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
 
             ISearchOperationCallbackFactory searchCallbackFactory;
             if (isSecondary) {
@@ -586,8 +574,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
             int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
         try {
-            ARecordType recType =
-                    (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
 
             boolean temp = dataset.getDatasetDetails().isTemp();
@@ -630,8 +617,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(
-                    dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
                 metaType =
@@ -751,7 +738,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
             ARecordType metaType = dataset.hasMetaPart()
                     ? (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
                     : null;
@@ -763,8 +749,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     itemType, metaType, context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
-                            temp);
+                    getSplitProviderAndConstraints(dataset);
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
 
@@ -779,9 +764,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     new TreeIndexBulkLoadOperatorDescriptor(spec, null, appContext.getStorageManager(),
                             appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                             comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
-                            numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                                    metaType, compactionInfo.first, compactionInfo.second),
+                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
+                            false, numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex,
+                                    itemType, metaType, compactionInfo.first, compactionInfo.second),
                             metadataPageManagerFactory);
             return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
@@ -951,12 +936,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 numKeyFields / 2);
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(String dataverseName,
-            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
-    }
-
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) {
         return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
     }
@@ -970,8 +949,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             throws MetadataException {
         DatasourceAdapter adapter;
         // search in default namespace (built-in adapter)
-        adapter =
-                MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
 
         // search in dataverse (user-defined adapter)
         if (adapter == null) {
@@ -985,8 +963,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
-            String dataverseName, String datasetName, String targetIdxName, boolean create)
-            throws AlgebricksException {
+            String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
         return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName,
                 targetIdxName, create);
     }
@@ -1104,8 +1081,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
-                            temp);
+                    getSplitProviderAndConstraints(dataset);
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
@@ -1171,7 +1147,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, null, true, indexName,
                     context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, null,
-                    metadataPageManagerFactory);
+                    metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             op.setType(itemType);
             op.setFilterIndex(fieldIdx);
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1222,8 +1198,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
                             (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
             IAType keyType = keyPairType.first;
-            comparatorFactories[i] =
-                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -1245,8 +1220,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             } catch (AsterixException e) {
                 throw new AlgebricksException(e);
             }
-            comparatorFactories[i] =
-                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -1295,8 +1269,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     dataset.getDatasetName(), dataset.getDatasetName());
             String indexName = primaryIndex.getIndexName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName())
-                    .getDatatype();
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
             ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
             ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
 
@@ -1304,8 +1277,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
-                            temp);
+                    getSplitProviderAndConstraints(dataset);
 
             // prepare callback
             int datasetId = dataset.getDatasetId();
@@ -1462,10 +1434,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             itemType = (ARecordType) MetadataManager.INSTANCE
                     .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
             validateRecordType(itemType);
-            ARecordType metaType = dataset.hasMetaPart()
-                    ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
-                            dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
-                    : null;
+            ARecordType metaType =
+                    dataset.hasMetaPart()
+                            ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                                    dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
+                            : null;
 
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1488,8 +1461,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
             IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
             for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), itemType);
+                Pair<IAType, Boolean> keyPairType =
+                        Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), itemType);
                 IAType keyType = keyPairType.first;
                 comparatorFactories[i] =
                         BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
@@ -1506,18 +1479,17 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1536,7 +1508,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
                         indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory);
+                        prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
@@ -1648,7 +1620,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
@@ -1671,11 +1643,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1694,13 +1665,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                         comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false,
                         indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory);
+                        prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory,
-                        filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE, metadataPageManagerFactory);
+                        comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory,
+                        false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                        metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1874,7 +1845,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2055,8 +2026,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName,
-                            dataset.getDatasetDetails().isTemp());
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // Generate Output Record format
             ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -2125,4 +2095,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     public IStorageComponentProvider getStorageComponentProvider() {
         return storaegComponentProvider;
     }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds)
+            throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), ds.getDatasetName(),
+                ds.getDatasetDetails().isTemp());
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds,
+            String indexName) throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), indexName,
+                ds.getDatasetDetails().isTemp());
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..34faf63 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,14 +29,15 @@ import java.util.logging.Logger;
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.JobUtils;
@@ -84,6 +85,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -409,8 +411,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             case LENGTH_PARTITIONED_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case SINGLE_PARTITION_WORD_INVIX:
-                return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this,
-                        index, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
+                return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, index,
+                        recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
                         filterCmpFactories);
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
@@ -577,4 +579,30 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
                 metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
                 datasetPartitions, isSink);
     }
+
+    /**
+     * Get the index dataflow helper factory for the dataset's primary index
+     *
+     * @param mdProvider
+     *            an instance of metadata provider that is used to fetch metadata information
+     * @throws AlgebricksException
+     */
+    public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider)
+            throws AlgebricksException {
+        if (getDatasetType() != DatasetType.INTERNAL) {
+            throw new AlgebricksException(ErrorCode.ASTERIX,
+                    ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, getDatasetType());
+        }
+        Index index = mdProvider.getIndex(getDataverseName(), getDatasetName(), getDatasetName());
+        ARecordType recordType = (ARecordType) mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
+        ARecordType metaType = (ARecordType) mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(this, mdProvider.getMetadataTxnContext());
+        return getIndexDataflowHelperFactory(mdProvider, index, recordType, metaType, compactionInfo.first,
+                compactionInfo.second);
+    }
+
+    public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+        return NoOpFrameOperationCallbackFactory.INSTANCE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 80792b5..572cc75 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -330,14 +330,12 @@ public class DatasetUtil {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return RuntimeUtils.createJobSpecification();
         }
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
         JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
-                        dataset.getDatasetName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
@@ -386,15 +384,12 @@ public class DatasetUtil {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return RuntimeUtils.createJobSpecification();
         }
-
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
         JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
-                        dataset.getDatasetName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
 
@@ -429,7 +424,6 @@ public class DatasetUtil {
         }
         Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                 datasetName, datasetName);
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         // get meta item type
@@ -451,7 +445,7 @@ public class DatasetUtil {
         int[] btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
@@ -495,7 +489,6 @@ public class DatasetUtil {
         if (dataset == null) {
             throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
         }
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, dataset);
@@ -505,7 +498,7 @@ public class DatasetUtil {
         ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
         int[] blooFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 249f035..edaa73e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -452,12 +452,11 @@ public class ExternalIndexingOperations {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
-        boolean temp = ds.getDatasetDetails().isTemp();
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+                metadataProvider.getSplitProviderAndConstraints(ds,
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()));
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
         Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -472,8 +471,7 @@ public class ExternalIndexingOperations {
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
-                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                        metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
                 IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
                         metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
                 treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -499,11 +497,9 @@ public class ExternalIndexingOperations {
                 DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-
-        boolean temp = ds.getDatasetDetails().isTemp();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+                metadataProvider.getSplitProviderAndConstraints(ds,
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()));
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
         Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -518,8 +514,7 @@ public class ExternalIndexingOperations {
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
-                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                        metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
                 IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
                         metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
                 treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -546,11 +541,9 @@ public class ExternalIndexingOperations {
                 DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        boolean temp = ds.getDatasetDetails().isTemp();
-
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+                metadataProvider.getSplitProviderAndConstraints(ds,
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()));
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
         Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -565,8 +558,7 @@ public class ExternalIndexingOperations {
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
-                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                        metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
                 IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
                         metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
                 treeDataflowHelperFactories.add(indexDataflowHelperFactory);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index c6e0a6b..701d0d6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -101,10 +101,8 @@ public class IndexUtil {
             throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
-        boolean temp = dataset.getDatasetDetails().isTemp();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
-                        index.getIndexName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         ARecordType recordType =
@@ -153,11 +151,9 @@ public class IndexUtil {
     public static JobSpecification buildDropSecondaryIndexJobSpec(Index index, MetadataProvider metadataProvider,
             Dataset dataset) throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
-        boolean temp = dataset.getDatasetDetails().isTemp();
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
-                        index.getIndexName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         ARecordType recordType =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index f7e569c..d731603 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -190,8 +190,7 @@ public abstract class SecondaryIndexOperationsHelper {
         metaSerde =
                 metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
-                        index.getIndexName(), dataset.getDatasetDetails().isTemp());
+                metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
         secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
         numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
@@ -203,8 +202,7 @@ public abstract class SecondaryIndexOperationsHelper {
                 numFilterFields = 0;
             }
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
-                    metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(),
-                            dataset.getDatasetName(), dataset.getDatasetName(), dataset.getDatasetDetails().isTemp());
+                    metadataProvider.getSplitProviderAndConstraints(dataset);
             primaryFileSplitProvider = primarySplitsAndConstraint.first;
             primaryPartitionConstraint = primarySplitsAndConstraint.second;
             setPrimaryRecDescAndComparators();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 2fae304..190a3b2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -71,6 +71,7 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
 public class ARecordPointable extends AbstractPointable {
 
     private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+    public static final ARecordPointableFactory FACTORY = new ARecordPointableFactory();
 
     public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
         private static final long serialVersionUID = 1L;
@@ -86,11 +87,15 @@ public class ARecordPointable extends AbstractPointable {
         }
     };
 
-    public static final IPointableFactory FACTORY = new IPointableFactory() {
+    public static class ARecordPointableFactory implements IPointableFactory {
+
         private static final long serialVersionUID = 1L;
 
+        private ARecordPointableFactory() {
+        }
+
         @Override
-        public IPointable createPointable() {
+        public ARecordPointable createPointable() {
             return new ARecordPointable();
         }
 
@@ -98,7 +103,8 @@ public class ARecordPointable extends AbstractPointable {
         public ITypeTraits getTypeTraits() {
             return TYPE_TRAITS;
         }
-    };
+
+    }
 
     public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() {
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index dd7335a..042837b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -22,7 +22,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -65,7 +64,7 @@ public class ReportMaxResourceIdMessage implements IApplicationMessage {
             ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 2fedcca..8739948 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,7 +20,6 @@ package org.apache.asterix.runtime.message;
 
 import java.util.Set;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -57,7 +56,7 @@ public class ResourceIdRequestMessage implements IApplicationMessage {
             }
             broker.sendApplicationMessageToNC(reponse, src);
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 6869523..037945a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -56,6 +56,9 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -83,13 +86,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     private final int filterFieldIndex;
     private final int metaFieldIndex;
     private LockThenSearchOperationCallback searchCallback;
+    private IFrameOperationCallback frameOpCallback;
+    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
 
-    public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
-            ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+    public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
+            ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory)
+            throws HyracksDataException {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
         this.key = new PermutingFrameTupleReference();
         this.numOfPrimaryKeys = numOfPrimaryKeys;
+        this.frameOpCallbackFactory = frameOpCallbackFactory;
         missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
         int[] searchKeyPermutations = new int[numOfPrimaryKeys];
         for (int i = 0; i < searchKeyPermutations.length; i++) {
@@ -104,7 +111,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
             isFiltered = true;
             this.recordType = recordType;
             this.presetFieldIndex = filterFieldIndex;
-            this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
+            this.recPointable = ARecordPointable.FACTORY.createPointable();
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
@@ -140,17 +147,19 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResource(), ctx, this);
+            modCallback = opDesc.getModificationOpCallbackFactory()
+                    .createModificationOperationCallback(indexHelper.getResource(), ctx, this);
             searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
             indexAccessor = index.createAccessor(modCallback, searchCallback);
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAppRuntimeContext runtimeCtx =
+                    (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
+            frameOpCallback =
+                    frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
@@ -188,7 +197,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
         tb.addFieldEndOffset();
     }
 
-    //TODO: use tryDelete/tryInsert in order to prevent deadlocks
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
@@ -221,8 +229,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                     }
                     // if with filters, append the filter
                     if (isFiltered) {
-                        dos.write(prevTuple.getFieldData(filterFieldIndex),
-                                prevTuple.getFieldStart(filterFieldIndex),
+                        dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex),
                                 prevTuple.getFieldLength(filterFieldIndex));
                         tb.addFieldEndOffset();
                     }
@@ -258,6 +265,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                 writeOutput(i, recordWasInserted, prevTuple != null);
                 i++;
             }
+            // callback here before calling nextFrame on the next operator
+            frameOpCallback.frameCompleted(!firstModification);
             appender.write(writer, true);
         } catch (IndexException | IOException | AsterixException e) {
             throw new HyracksDataException(e);
@@ -318,6 +327,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
 
     @Override
     public void flush() throws HyracksDataException {
-        writer.flush();
+        // No op since nextFrame flushes by default
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index fe69a04..b37ecae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -37,12 +37,14 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 
 public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
     private final int[] prevValuePermutation;
+    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
     private ARecordType type;
     private int filterIndex = -1;
 
@@ -54,12 +56,13 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
             boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackProvider,
             ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
-            IPageManagerFactory pageManagerFactory) {
+            IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
                 dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory,
                 modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory);
         this.prevValuePermutation = prevValuePermutation;
+        this.frameOpCallbackFactory = frameOpCallbackFactory;
     }
 
     @Override
@@ -67,7 +70,7 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return isPrimary()
                 ? new LSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                        recordDescProvider, comparatorFactories.length, type, filterIndex)
+                        recordDescProvider, comparatorFactories.length, type, filterIndex, frameOpCallbackFactory)
                 : new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
                         recordDescProvider, prevValuePermutation);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 33078ff..90f6bbf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,7 +32,7 @@ import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +46,7 @@ import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
 public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
-    private final static long SEED = 0L;
+    protected static final long SEED = 0L;
 
     protected final ITransactionManager transactionManager;
     protected final ILogManager logMgr;
@@ -85,8 +85,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
             logRecord = new LogRecord(callback);
             if (isSink) {
                 return;
@@ -112,6 +111,8 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
                  * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
                  * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
                  */
+                // TODO: Fix this for upserts. an upsert tuple right now expect to notify the opTracker twice (one for
+                // delete and one for insert)
                 transactionContext.notifyOptracker(false);
             } else {
                 tRef.reset(tAccess, t);
@@ -126,7 +127,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
                 }
             }
         }
-        VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+        IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
         if (message != null
                 && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
             try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..cfe2a25 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -29,14 +29,14 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
-    private final int datasetId;
-    private final int[] primaryKeyFields;
-    private final boolean isTemporaryDatasetWriteJob;
-    private final boolean isWriteTransaction;
-    private final int upsertVarIdx;
-    private int[] datasetPartitions;
-    private final boolean isSink;
+    protected final JobId jobId;
+    protected final int datasetId;
+    protected final int[] primaryKeyFields;
+    protected final boolean isTemporaryDatasetWriteJob;
+    protected final boolean isWriteTransaction;
+    protected final int upsertVarIdx;
+    protected int[] datasetPartitions;
+    protected final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
             boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
index 06538af..80dd19b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
@@ -43,6 +43,7 @@ public class VSizeFrame implements IFrame {
         buffer = ctx.allocateFrame(frameSize);
     }
 
+    @Override
     public ByteBuffer getBuffer() {
         return buffer;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 6c581f0..4c0eb1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -28,8 +28,10 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
  */
 public class HyracksDataException extends HyracksException {
 
+    private static final long serialVersionUID = 1L;
+
     public static HyracksDataException create(Throwable cause) {
-        if (cause instanceof HyracksDataException) {
+        if (cause instanceof HyracksDataException || cause == null) {
             return (HyracksDataException) cause;
         }
         return new HyracksDataException(cause);
@@ -48,6 +50,14 @@ public class HyracksDataException extends HyracksException {
                 .getParams());
     }
 
+    public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
+        if (root == null) {
+            return HyracksDataException.create(th);
+        }
+        root.addSuppressed(th);
+        return root;
+    }
+
     public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
         super(component, errorCode, message, cause, nodeId, params);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 21b9dcf..77404b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -463,4 +463,8 @@ public class ClusterControllerService implements IControllerService {
             return CCApplicationEntryPoint.INSTANCE;
         }
     }
+
+    public ICCApplicationEntryPoint getApplication() {
+        return aep;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 05417a8..77f18ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,6 +25,19 @@ public abstract class AbstractPointable implements IPointable {
 
     protected int length;
 
+    /**
+     * copies the content of this pointable to the passed byte array.
+     * the array is expected to be at least of length = length of this pointable
+     *
+     * @param copy
+     *            the array to write into
+     * @throws ArrayIndexOutOfBoundsException
+     *             if the passed array size is smaller than length
+     */
+    public void copyInto(byte[] copy) {
+        System.arraycopy(bytes, start, copy, 0, length);
+    }
+
     @Override
     public void set(byte[] bytes, int start, int length) {
         this.bytes = bytes;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
index 74ced4f..2e8071c 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
@@ -18,8 +18,27 @@
  */
 package org.apache.hyracks.data.std.api;
 
+/**
+ * Point to range over byte array
+ */
 public interface IPointable extends IValueReference {
-    public void set(byte[] bytes, int start, int length);
+    /**
+     * Point to the range from position = start with length = length over the byte array bytes
+     *
+     * @param bytes
+     *            the byte array
+     * @param start
+     *            the start offset
+     * @param length
+     *            the length of the range
+     */
+    void set(byte[] bytes, int start, int length);
 
-    public void set(IValueReference pointer);
+    /**
+     * Point to the same range pointed to by the passed pointer
+     *
+     * @param pointer
+     *            the pointer to the targetted range
+     */
+    void set(IValueReference pointer);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
index ee00163..51c155e 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
@@ -20,10 +20,10 @@ package org.apache.hyracks.data.std.primitive;
 
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.data.std.api.AbstractPointable;
-import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IPointableFactory;
 
 public final class VoidPointable extends AbstractPointable {
+    public static final VoidPointableFactory FACTORY = new VoidPointableFactory();
     public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
         private static final long serialVersionUID = 1L;
 
@@ -38,11 +38,14 @@ public final class VoidPointable extends AbstractPointable {
         }
     };
 
-    public static final IPointableFactory FACTORY = new IPointableFactory() {
+    public static class VoidPointableFactory implements IPointableFactory {
         private static final long serialVersionUID = 1L;
 
+        private VoidPointableFactory() {
+        }
+
         @Override
-        public IPointable createPointable() {
+        public VoidPointable createPointable() {
             return new VoidPointable();
         }
 
@@ -50,5 +53,5 @@ public final class VoidPointable extends AbstractPointable {
         public ITypeTraits getTypeTraits() {
             return TYPE_TRAITS;
         }
-    };
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 57f8072..efdd963 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -108,4 +108,11 @@ public class AbstractFrameAppender implements IFrameAppender {
         return false;
     }
 
+    @Override
+    public void flush(IFrameWriter writer) throws HyracksDataException {
+        if (tupleCount > 0) {
+            write(writer, true);
+        }
+        writer.flush();
+    }
 }


[3/3] asterixdb git commit: Cleanup and bug fixes in Feeds pipeline

Posted by am...@apache.org.
Cleanup and bug fixes in Feeds pipeline

The bug fix is for the MessagingFrameTupleAppender. It used
to consume one extra byte per frame.

Cleanups are for feeds. These include:
1. Remove the use of feed dataflow marker. Feeds which
   need to send progress can and should do that without
   a marker thread.
2. Lock the memory component for feed commit to be able
   to add information to the memory component's metadata
   page safely.

In addition, this change introduces a frame level callback
for index operations.

Change-Id: Ie97b2133ebecb7380cf0ba336e60ed714d06f8ee
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1523
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <bu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/8c427cd4
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/8c427cd4
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/8c427cd4

Branch: refs/heads/master
Commit: 8c427cd4b1074fa015e1d66095309176fa4f42dd
Parents: 31d8102
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Tue Mar 7 19:12:58 2017 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Mar 8 12:51:17 2017 -0800

----------------------------------------------------------------------
 .../physical/InvertedIndexPOperator.java        |   3 +-
 .../asterix/app/cc/CCExtensionManager.java      |   2 +-
 .../app/nc/task/BindMetadataNodeTask.java       |   3 +-
 .../app/nc/task/ExternalLibrarySetupTask.java   |   3 +-
 .../asterix/app/nc/task/LocalRecoveryTask.java  |   3 +-
 .../app/nc/task/MetadataBootstrapTask.java      |   3 +-
 .../nc/task/StartReplicationServiceTask.java    |   3 +-
 .../replication/AutoFaultToleranceStrategy.java |   3 +-
 .../MetadataNodeFaultToleranceStrategy.java     |   3 +-
 .../replication/NoFaultToleranceStrategy.java   |   3 +-
 .../message/CompleteFailbackRequestMessage.java |   5 +-
 ...PreparePartitionsFailbackRequestMessage.java |   5 +-
 .../ReplayPartitionLogsRequestMessage.java      |   3 +-
 .../message/StartupTaskRequestMessage.java      |   9 +-
 .../message/StartupTaskResponseMessage.java     |   3 +-
 .../TakeoverMetadataNodeRequestMessage.java     |   5 +-
 .../TakeoverPartitionsRequestMessage.java       |   5 +-
 .../asterix/messaging/CCMessageBroker.java      |   3 +-
 .../apache/asterix/utils/FlushDatasetUtil.java  |   3 +-
 .../ConnectorDescriptorWithMessagingTest.java   |  14 +-
 .../common/context/DatasetLifecycleManager.java |   7 +-
 .../asterix/common/context/DatasetResource.java |   2 +-
 .../NoOpFrameOperationCallbackFactory.java      |  47 ++++++
 .../asterix/common/exceptions/ErrorCode.java    |   2 +-
 .../common/exceptions/ExceptionUtils.java       |  27 ----
 .../PrimaryIndexLogMarkerCallback.java          |   4 +
 .../main/resources/asx_errormsg/en.properties   |   1 +
 .../adapter/factory/GenericAdapterFactory.java  |  16 +-
 .../asterix/external/api/IAdapterFactory.java   |  36 +++--
 .../api/IExternalDataSourceFactory.java         |   3 +-
 .../dataflow/ChangeFeedDataFlowController.java  |   8 +-
 .../ChangeFeedWithMetaDataFlowController.java   |  10 +-
 .../dataflow/FeedRecordDataFlowController.java  | 146 ++++--------------
 .../external/dataflow/FeedTupleForwarder.java   |   8 -
 .../FeedWithMetaDataFlowController.java         |   9 +-
 .../stream/StreamRecordReaderFactory.java       |   3 +-
 .../input/stream/SocketServerInputStream.java   |   7 +-
 .../FeedIntakeOperatorNodePushable.java         |  15 +-
 .../provider/DataflowControllerProvider.java    |  10 +-
 .../external/util/ExternalDataConstants.java    |   2 -
 .../external/util/ExternalDataUtils.java        |   8 +-
 .../adapter/TestTypedAdapterFactory.java        |  11 +-
 .../metadata/declared/MetadataProvider.java     | 148 +++++++++----------
 .../asterix/metadata/entities/Dataset.java      |  34 ++++-
 .../asterix/metadata/utils/DatasetUtil.java     |  15 +-
 .../utils/ExternalIndexingOperations.java       |  26 ++--
 .../asterix/metadata/utils/IndexUtil.java       |   8 +-
 .../utils/SecondaryIndexOperationsHelper.java   |   6 +-
 .../pointables/nonvisitor/ARecordPointable.java |  12 +-
 .../message/ReportMaxResourceIdMessage.java     |   3 +-
 .../message/ResourceIdRequestMessage.java       |   3 +-
 .../LSMPrimaryUpsertOperatorNodePushable.java   |  33 +++--
 .../LSMTreeUpsertOperatorDescriptor.java        |   7 +-
 .../management/runtime/CommitRuntime.java       |  11 +-
 .../runtime/CommitRuntimeFactory.java           |  16 +-
 .../org/apache/hyracks/api/comm/VSizeFrame.java |   1 +
 .../api/exceptions/HyracksDataException.java    |  12 +-
 .../control/cc/ClusterControllerService.java    |   4 +
 .../hyracks/data/std/api/AbstractPointable.java |  13 ++
 .../apache/hyracks/data/std/api/IPointable.java |  23 ++-
 .../data/std/primitive/VoidPointable.java       |  11 +-
 .../common/comm/io/AbstractFrameAppender.java   |   7 +
 .../common/io/MessagingFrameTupleAppender.java  |  40 +++--
 .../std/connectors/PartitionDataWriter.java     |  34 ++---
 .../IModificationOperationCallbackFactory.java  |   2 +-
 .../freepage/MutableArrayValueReference.java    |   4 -
 .../lsm/common/api/IFrameOperationCallback.java |  37 +++++
 .../api/IFrameOperationCallbackFactory.java     |  40 +++++
 .../storage/am/lsm/common/api/ILSMHarness.java  | 141 ++++++++++++++++++
 .../am/lsm/common/api/ILSMIndexAccessor.java    |  87 ++++++++++-
 .../storage/am/lsm/common/impls/LSMHarness.java |  41 ++++-
 .../lsm/common/impls/LSMTreeIndexAccessor.java  |  15 ++
 .../impls/LSMInvertedIndexAccessor.java         |  15 ++
 73 files changed, 844 insertions(+), 471 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 86faa6c..c6ea045 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -237,8 +237,7 @@ public class InvertedIndexPOperator extends IndexSearchPOperator {
             }
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
-                    metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), datasetName, indexName,
-                            dataset.getDatasetDetails().isTemp());
+                    metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
             // TODO: Here we assume there is only one search key field.
             int queryField = keyFields[0];
             // Get tokenizer and search modifier factories.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
index f7e70a3..6c6f2af 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CCExtensionManager.java
@@ -54,7 +54,7 @@ public class CCExtensionManager implements IAlgebraExtensionManager {
     private transient IStatementExecutorFactory statementExecutorFactory;
 
     /**
-     * Initialize {@code CompilerExtensionManager} from configuration
+     * Initialize {@link org.apache.asterix.app.cc.CCExtensionManager} from configuration
      *
      * @param list
      *            a list of extensions

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
index d1edec0..99f641b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/BindMetadataNodeTask.java
@@ -20,7 +20,6 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -45,7 +44,7 @@ public class BindMetadataNodeTask implements INCLifecycleTask {
                 runtimeContext.unexportMetadataNodeStub();
             }
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
index 8e842a9..8604364 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/ExternalLibrarySetupTask.java
@@ -21,7 +21,6 @@ package org.apache.asterix.app.nc.task;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -42,7 +41,7 @@ public class ExternalLibrarySetupTask implements INCLifecycleTask {
         try {
             ExternalLibraryUtils.setUpExternaLibraries(appContext.getLibraryManager(), metadataNode);
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index 203e453..bca39b0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -45,7 +44,7 @@ public class LocalRecoveryTask implements INCLifecycleTask {
         try {
             runtimeContext.getTransactionSubsystem().getRecoveryManager().startLocalRecovery(partitions);
         } catch (IOException | ACIDException e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index ab19573..f7c33a4 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -20,7 +20,6 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
@@ -38,7 +37,7 @@ public class MetadataBootstrapTask implements INCLifecycleTask {
             SystemState state = appContext.getTransactionSubsystem().getRecoveryManager().getSystemState();
             appContext.initializeMetadata(state == SystemState.PERMANENT_DATA_LOSS);
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
index d060f61..17fde86 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/StartReplicationServiceTask.java
@@ -20,7 +20,6 @@ package org.apache.asterix.app.nc.task;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
@@ -43,7 +42,7 @@ public class StartReplicationServiceTask implements INCLifecycleTask {
             //Start replication after the state of remote replicas has been initialized.
             replicationManager.startReplicationThreads();
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index 8d8a0f2..45f96ac 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -56,7 +56,6 @@ import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -495,7 +494,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy {
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 5a7036a..c40e236 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -47,7 +47,6 @@ import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -162,7 +161,7 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
index 51defaa..b8b3c49 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NoFaultToleranceStrategy.java
@@ -41,7 +41,6 @@ import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -107,7 +106,7 @@ public class NoFaultToleranceStrategy implements IFaultToleranceStrategy {
         try {
             messageBroker.sendApplicationMessageToNC(response, msg.getNodeId());
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
index 2d423f9..16a800b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/CompleteFailbackRequestMessage.java
@@ -24,7 +24,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
@@ -75,7 +74,7 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage
             remoteRecoeryManager.completeFailbackProcess();
         } catch (IOException | InterruptedException e) {
             LOGGER.log(Level.SEVERE, "Failure during completion of failback process", e);
-            hde = ExceptionUtils.convertToHyracksDataException(e);
+            hde = HyracksDataException.create(e);
         } finally {
             CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(planId,
                     requestId, partitions);
@@ -83,7 +82,7 @@ public class CompleteFailbackRequestMessage extends AbstractFailbackPlanMessage
                 broker.sendMessageToCC(reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.SEVERE, "Failure sending message to CC", e);
-                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+                hde = HyracksDataException.suppress(hde, e);
             }
         }
         if (hde != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
index 2104f9c..8188c44 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/PreparePartitionsFailbackRequestMessage.java
@@ -24,7 +24,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.runtime.message.AbstractFailbackPlanMessage;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -91,7 +90,7 @@ public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPla
                 appContext.unexportMetadataNodeStub();
             } catch (RemoteException e) {
                 LOGGER.log(Level.SEVERE, "Failed unexporting metadata stub", e);
-                throw ExceptionUtils.convertToHyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
         } else {
             //close all non-metadata datasets
@@ -114,7 +113,7 @@ public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPla
             broker.sendMessageToCC(reponse);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
index 96ae8be..e0bc49d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/ReplayPartitionLogsRequestMessage.java
@@ -23,7 +23,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -53,7 +52,7 @@ public class ReplayPartitionLogsRequestMessage implements INCLifecycleMessage {
             broker.sendMessageToCC(reponse);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
index be42a9d..472a89c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskRequestMessage.java
@@ -18,7 +18,9 @@
  */
 package org.apache.asterix.app.replication.message;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -27,9 +29,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
 public class StartupTaskRequestMessage implements INCLifecycleMessage {
 
     private static final Logger LOGGER = Logger.getLogger(StartupTaskRequestMessage.class.getName());
@@ -48,7 +47,7 @@ public class StartupTaskRequestMessage implements INCLifecycleMessage {
             ((INCMessageBroker) cs.getApplicationContext().getMessageBroker()).sendMessageToCC(msg);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Unable to send StartupTaskRequestMessage to CC", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
index 6a72776..922ac89 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/StartupTaskResponseMessage.java
@@ -23,7 +23,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -62,7 +61,7 @@ public class StartupTaskResponseMessage implements INCLifecycleMessage {
             broker.sendMessageToCC(result);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed sending message to cc", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
index 8ce12dd..3be3eab 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverMetadataNodeRequestMessage.java
@@ -22,7 +22,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +45,7 @@ public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage {
             appContext.exportMetadataNodeStub();
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
-            hde = new HyracksDataException(e);
+            hde = HyracksDataException.create(e);
         } finally {
             TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
                     appContext.getTransactionSubsystem().getId());
@@ -54,7 +53,7 @@ public class TakeoverMetadataNodeRequestMessage implements INCLifecycleMessage {
                 broker.sendMessageToCC(reponse);
             } catch (Exception e) {
                 LOGGER.log(Level.SEVERE, "Failed taking over metadata", e);
-                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+                hde = HyracksDataException.suppress(hde, e);
             }
         }
         if (hde != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
index 4e415de..7f9cc2b 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/TakeoverPartitionsRequestMessage.java
@@ -24,7 +24,6 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
@@ -86,7 +85,7 @@ public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage {
                 remoteRecoeryManager.takeoverPartitons(partitions);
             } catch (IOException | ACIDException e) {
                 LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
-                hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+                hde = HyracksDataException.suppress(hde, e);
             } finally {
                 //send response after takeover is completed
                 TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(requestId,
@@ -95,7 +94,7 @@ public class TakeoverPartitionsRequestMessage implements INCLifecycleMessage {
                     broker.sendMessageToCC(reponse);
                 } catch (Exception e) {
                     LOGGER.log(Level.SEVERE, "Failure taking over partitions", e);
-                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+                    hde = HyracksDataException.suppress(hde, e);
                 }
             }
             if (hde != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index d785cce..da93fb8 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.messaging;
 
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -32,7 +31,7 @@ import org.apache.hyracks.control.cc.cluster.INodeManager;
 
 public class CCMessageBroker implements ICCMessageBroker {
 
-    private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
     private final ClusterControllerService ccs;
 
     public CCMessageBroker(ClusterControllerService ccs) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index bc8a79e..d766827 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -62,8 +62,7 @@ public class FlushDatasetUtil {
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, flushOperator, 0);
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, indexName,
-                        dataset.getDatasetDetails().isTemp());
+                metadataProvider.getSplitProviderAndConstraints(dataset, indexName);
         AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
 
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, emptySource,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index d04217c..48ca338 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -159,8 +159,8 @@ public class ConnectorDescriptorWithMessagingTest {
                     BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
             RecordDescriptor rDesc = new RecordDescriptor(serdes);
             TestPartitionWriterFactory partitionWriterFactory = new TestPartitionWriterFactory();
-            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory,
-                    CURRENT_PRODUCER, NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
+            IFrameWriter partitioner = connector.createPartitioner(ctx, rDesc, partitionWriterFactory, CURRENT_PRODUCER,
+                    NUMBER_OF_CONSUMERS, NUMBER_OF_CONSUMERS);
             partitioner.open();
             FrameTupleAccessor fta = new FrameTupleAccessor(rDesc);
             List<TestFrameWriter> recipients = new ArrayList<>();
@@ -263,11 +263,11 @@ public class ConnectorDescriptorWithMessagingTest {
             }
             partitioner.nextFrame(frame.getBuffer());
             partitioner.flush();
-            Assert.assertEquals(partitionWriterFactory.getWriters().get(0).nextFrameCount(), 1);
-            Assert.assertEquals(partitionWriterFactory.getWriters().get(1).nextFrameCount(), 2);
-            Assert.assertEquals(partitionWriterFactory.getWriters().get(2).nextFrameCount(), 1);
-            Assert.assertEquals(partitionWriterFactory.getWriters().get(3).nextFrameCount(), 2);
-            Assert.assertEquals(partitionWriterFactory.getWriters().get(4).nextFrameCount(), 2);
+            Assert.assertEquals(1, partitionWriterFactory.getWriters().get(0).nextFrameCount());
+            Assert.assertEquals(2, partitionWriterFactory.getWriters().get(1).nextFrameCount());
+            Assert.assertEquals(1, partitionWriterFactory.getWriters().get(2).nextFrameCount());
+            Assert.assertEquals(2, partitionWriterFactory.getWriters().get(3).nextFrameCount());
+            Assert.assertEquals(2, partitionWriterFactory.getWriters().get(4).nextFrameCount());
             for (TestFrameWriter writer : recipients) {
                 fta.reset(writer.getLastFrame());
                 Assert.assertEquals(fta.getTupleCount(), 1);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index cbb4868..1a8ccae 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -72,7 +73,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized IIndex get(String resourcePath) throws HyracksDataException {
+    public synchronized ILSMIndex get(String resourcePath) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -80,7 +81,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized IIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
+    public synchronized ILSMIndex getIndex(int datasetID, long resourceID) throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         DatasetResource datasetResource = datasets.get(datasetID);
         if (datasetResource == null) {
@@ -556,7 +557,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
                 while (used + additionalSize > capacity) {
                     if (!evictCandidateDataset()) {
                         throw new HyracksDataException("Cannot allocate dataset " + dsInfo.getDatasetID()
-                                + " memory since memory budget would be exceeded.");
+                        + " memory since memory budget would be exceeded.");
                     }
                 }
                 used += additionalSize;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 403d3cb..41e587d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -81,7 +81,7 @@ public class DatasetResource implements Comparable<DatasetResource> {
         return datasetVirtualBufferCaches;
     }
 
-    public IIndex getIndex(long resourceID) {
+    public ILSMIndex getIndex(long resourceID) {
         IndexInfo iInfo = getIndexInfo(resourceID);
         return (iInfo == null) ? null : iInfo.getIndex();
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
new file mode 100644
index 0000000..3952b11
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+
+public class NoOpFrameOperationCallbackFactory implements IFrameOperationCallbackFactory {
+    private static final long serialVersionUID = 1L;
+    private static final NoOpFrameOperationCallback CALLBACK = new NoOpFrameOperationCallback();
+    public static final NoOpFrameOperationCallbackFactory INSTANCE = new NoOpFrameOperationCallbackFactory();
+
+    private NoOpFrameOperationCallbackFactory() {
+    }
+
+    @Override
+    public IFrameOperationCallback createFrameOperationCallback(IHyracksTaskContext ctx,
+            ILSMIndexAccessor indexAccessor) {
+        return CALLBACK;
+    }
+
+    private static class NoOpFrameOperationCallback implements IFrameOperationCallback {
+        @Override
+        public void frameCompleted(boolean modified) throws HyracksDataException {
+            // No Op
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 4898e40..517e243 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -21,7 +21,6 @@ package org.apache.asterix.common.exceptions;
 import java.io.InputStream;
 import java.util.Map;
 
-import org.apache.asterix.event.schema.cluster.FaultTolerance;
 import org.apache.hyracks.api.util.ErrorMessageUtil;
 
 // Error code:
@@ -85,6 +84,7 @@ public class ErrorCode {
     public static final int COMPILATION_AQLPLUS_IDENTIFIER_NOT_FOUND = 1024;
     public static final int COMPILATION_AQLPLUS_NO_SUCH_JOIN_TYPE = 1025;
     public static final int COMPILATION_FUNC_EXPRESSION_CANNOT_UTILIZE_INDEX = 1026;
+    public static final int COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX = 1027;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index b9d187d..649f1f5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.common.exceptions;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
 public class ExceptionUtils {
     public static final String INCORRECT_PARAMETER = "Incorrect parameter.\n";
     public static final String MISSING_PARAMETER = "Missing parameter.\n";
@@ -34,29 +32,4 @@ public class ExceptionUtils {
         return INCORRECT_PARAMETER + PARAMETER_NAME + parameterName + System.lineSeparator() + EXPECTED_VALUE
                 + expectedValue + System.lineSeparator() + PASSED_VALUE + passedValue;
     }
-
-    public static HyracksDataException suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
-        if (hde == null) {
-            return new HyracksDataException(th);
-        } else {
-            hde.addSuppressed(th);
-            return hde;
-        }
-    }
-
-    public static Throwable suppress(Throwable suppressor, Throwable suppressed) {
-        if (suppressor == null) {
-            return suppressed;
-        } else if (suppressed != null) {
-            suppressor.addSuppressed(suppressed);
-        }
-        return suppressor;
-    }
-
-    public static HyracksDataException convertToHyracksDataException(Throwable throwable) {
-        if (throwable == null || throwable instanceof HyracksDataException) {
-            return (HyracksDataException) throwable;
-        }
-        return new HyracksDataException(throwable);
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index b977c4d..bbe2c4f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -119,4 +119,8 @@ public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
         pointable.setLong(lsn);
         index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY, pointable);
     }
+
+    public ILSMIndex getIndex() {
+        return index;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index aae7050..bf9c6f9 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -70,6 +70,7 @@
 1024 = Identifier %1$s is not found in AQL+ meta-scope
 1025 = There is no such join type in AQL+
 1026 = The given function expression %1$s cannot utilize index
+1027 = Dataset of type %1$s doesn't have a primary index
 
 # Feed Errors
 3001 = Illegal state.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d03f9df..37262b7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -61,7 +61,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     private boolean isFeed;
     private FileSplit[] feedLogFileSplits;
     private ARecordType metaType;
-    private FeedLogManager feedLogManager = null;
+    private transient FeedLogManager feedLogManager;
 
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -75,8 +75,7 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws HyracksDataException, AlgebricksException {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         return dataSourceFactory.getPartitionConstraint();
     }
 
@@ -86,12 +85,12 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
     @Override
     public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
+        IAppRuntimeContext runtimeCtx =
+                (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
         try {
             restoreExternalObjects(runtimeCtx.getLibraryManager());
         } catch (Exception e) {
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
         if (isFeed) {
             if (feedLogManager == null) {
@@ -184,6 +183,11 @@ public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterF
         this.metaType = metaType;
     }
 
+    /**
+     * used by extensions to access shared datasource factory for a job
+     *
+     * @return the data source factory
+     */
     public IExternalDataSourceFactory getDataSourceFactory() {
         return dataSourceFactory;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 3ea3bb1..df1b43e 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -40,7 +40,7 @@ public interface IAdapterFactory extends Serializable {
      *
      * @return the display name
      */
-    public String getAlias();
+    String getAlias();
 
     /**
      * Gets a list of partition constraints. A partition constraint can be a
@@ -54,10 +54,8 @@ public interface IAdapterFactory extends Serializable {
      * running on the node with the given IP address.
      *
      * @throws AlgebricksException
-     * @throws HyracksDataException
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws HyracksDataException, AlgebricksException;
+    AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
 
     /**
      * Creates an instance of IDatasourceAdapter.
@@ -67,22 +65,40 @@ public interface IAdapterFactory extends Serializable {
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
+    IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) throws HyracksDataException;
 
     /**
+     * Configure the adapter
+     *
      * @param libraryManager
      * @param configuration
      * @throws AlgebricksException
      * @throws HyracksDataException
      */
-    public void configure(ILibraryManager libraryManager, Map<String, String> configuration)
+    void configure(ILibraryManager libraryManager, Map<String, String> configuration)
             throws HyracksDataException, AlgebricksException;
 
-    public void setOutputType(ARecordType outputType);
+    /**
+     * Set the expected record output type of the adapter
+     *
+     * @param outputType
+     */
+    void setOutputType(ARecordType outputType);
 
-    public void setMetaType(ARecordType metaType);
+    /**
+     * Set the expected meta output type of the adapter
+     *
+     * @param metaType
+     */
+    void setMetaType(ARecordType metaType);
 
-    public ARecordType getOutputType();
+    /**
+     * @return the adapter record output type
+     */
+    ARecordType getOutputType();
 
-    public ARecordType getMetaType();
+    /**
+     * @return the adapter meta output type
+     */
+    ARecordType getMetaType();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e2274b9..5538369 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -53,8 +53,7 @@ public interface IExternalDataSourceFactory extends Serializable {
      * @return
      * @throws AsterixException
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws AlgebricksException, HyracksDataException;
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException;
 
     /**
      * Configure the data parser factory. The passed map contains key value pairs from the

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index d85fe65..57e79c3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.io.IOException;
-
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordWithPKDataParser;
@@ -34,9 +32,9 @@ public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowControlle
 
     public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
+            final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+                    throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
         this.dataParser = dataParser;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4c88b0f..22fa8be 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.io.IOException;
-
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordWithMetadataParser;
@@ -32,14 +30,14 @@ public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlo
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
+            final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
+                    throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
     }
 
     @Override
     protected void addPrimaryKeys(final ArrayTupleBuilder tb, final IRawRecord<? extends T> record)
             throws HyracksDataException {
-        ((IRecordWithMetadataParser<T>) dataParser).appendLastParsedPrimaryKeyToTuple(tb);
+        dataParser.appendLastParsedPrimaryKeyToTuple(tb);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2e687ba..7ba3ae4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,71 +19,52 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.external.api.IFeedMarker;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
     private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
-    protected final IRecordDataParser<T> dataParser;
-    protected final IRecordReader<? extends T> recordReader;
+    private final IRecordDataParser<T> dataParser;
+    private final IRecordReader<T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected static final long INTERVAL = 1000;
-    protected final Object mutex = new Object();
-    protected final boolean sendMarker;
     protected boolean failed = false;
-    private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
-    private Future<?> dataflowMarkerResult;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, IRecordDataParser<T> dataParser,
-            IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+            IRecordReader<T> recordReader) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
-        this.sendMarker = sendMarker;
         recordReader.setFeedLogManager(feedLogManager);
         recordReader.setController(this);
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
-        startDataflowMarker();
         HyracksDataException hde = null;
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
-                // synchronized on mutex before we call next() so we don't a marker before its record
-                synchronized (mutex) {
-                    IRawRecord<? extends T> record = recordReader.next();
-                    if (record == null) {
-                        flush();
-                        mutex.wait(INTERVAL);
-                        continue;
-                    }
-                    tb.reset();
-                    parseAndForward(record);
+                IRawRecord<? extends T> record = recordReader.next();
+                if (record == null) {
+                    flush();
+                    Thread.sleep(INTERVAL); // NOSONAR: No one notifies the sleeping thread
+                    continue;
                 }
+                tb.reset();
+                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside termination of a job/feed
@@ -93,24 +74,20 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
             failed = true;
             tupleForwarder.flush();
             LOGGER.warn("Failure while operating a feed source", e);
-            throw new HyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
-        stopDataflowMarker();
         try {
             tupleForwarder.close();
         } catch (Throwable th) {
-            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+            hde = HyracksDataException.suppress(hde, th);
         }
         try {
             recordReader.close();
         } catch (Throwable th) {
             LOGGER.warn("Failure during while operating a feed sourcec", th);
-            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+            hde = HyracksDataException.suppress(hde, th);
         } finally {
             closeSignal();
-            if (sendMarker && dataflowMarkerResult != null) {
-                dataflowMarkerResult.cancel(true);
-            }
         }
         if (hde != null) {
             throw hde;
@@ -118,20 +95,18 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     }
 
     private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
-        synchronized (dataParser) {
-            try {
-                dataParser.parse(record, tb.getDataOutput());
-            } catch (Exception e) {
-                LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
-                feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
-                // continue the outer loop
-                return;
-            }
-            tb.addFieldEndOffset();
-            addMetaPart(tb, record);
-            addPrimaryKeys(tb, record);
-            tupleForwarder.addTuple(tb);
+        try {
+            dataParser.parse(record, tb.getDataOutput());
+        } catch (Exception e) {
+            LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+            feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
+            // continue the outer loop
+            return;
         }
+        tb.addFieldEndOffset();
+        addMetaPart(tb, record);
+        addPrimaryKeys(tb, record);
+        tupleForwarder.addTuple(tb);
     }
 
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
@@ -140,21 +115,6 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
     protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws IOException {
     }
 
-    private void startDataflowMarker() {
-        ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
-        if (sendMarker && dataflowMarker == null) {
-            dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
-                    TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
-            dataflowMarkerResult = executorService.submit(dataflowMarker);
-        }
-    }
-
-    private void stopDataflowMarker() {
-        if (dataflowMarker != null) {
-            dataflowMarker.stop();
-        }
-    }
-
     private void closeSignal() {
         synchronized (closed) {
             closed.set(true);
@@ -172,7 +132,6 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
 
     @Override
     public boolean stop() throws HyracksDataException {
-        stopDataflowMarker();
         HyracksDataException hde = null;
         if (recordReader.stop()) {
             if (failed) {
@@ -180,12 +139,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
                 try {
                     tupleForwarder.close();
                 } catch (Throwable th) {
-                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+                    hde = HyracksDataException.suppress(hde, th);
                 }
                 try {
                     recordReader.close();
                 } catch (Throwable th) {
-                    hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
+                    hde = HyracksDataException.suppress(hde, th);
                 }
                 if (hde != null) {
                     throw hde;
@@ -194,7 +153,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
                 try {
                     waitForSignal();
                 } catch (InterruptedException e) {
-                    throw new HyracksDataException(e);
+                    throw HyracksDataException.create(e);
                 }
             }
             return true;
@@ -208,52 +167,11 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
         return recordReader.handleException(th);
     }
 
-    private class DataflowMarker implements Runnable {
-        private final IFeedMarker marker;
-        private final VSizeFrame mark;
-        private volatile boolean stopped = false;
-
-        public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
-            this.marker = marker;
-            this.mark = mark;
-        }
-
-        public synchronized void stop() {
-            stopped = true;
-            notify();
-        }
+    public IRecordReader<T> getReader() {
+        return recordReader;
+    }
 
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    synchronized (this) {
-                        if (!stopped) {
-                            // TODO (amoudi): find a better reactive way to do this
-                            // sleep for two seconds
-                            wait(TimeUnit.SECONDS.toMillis(2));
-                        } else {
-                            break;
-                        }
-                    }
-                    synchronized (mutex) {
-                        if (marker.mark(mark)) {
-                            // broadcast
-                            tupleForwarder.flush();
-                            // clear
-                            mark.getBuffer().clear();
-                            mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-                            mark.getBuffer().flip();
-                        }
-                    }
-                }
-            } catch (InterruptedException e) {
-                LOGGER.warn("Marker stopped", e);
-                Thread.currentThread().interrupt();
-                return;
-            } catch (Exception e) {
-                LOGGER.warn("Marker stopped", e);
-            }
-        }
+    public IRecordDataParser<T> getParser() {
+        return dataParser;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d31e074..4177ea6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -28,12 +28,9 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -58,11 +55,6 @@ public class FeedTupleForwarder implements ITupleForwarder {
             this.frame = new VSizeFrame(ctx);
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
-            // Set null feed message
-            VSizeFrame message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
-            // a null message
-            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-            message.getBuffer().flip();
             initialized = true;
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 45ae52b..c7f6d9c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -28,15 +28,18 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class FeedWithMetaDataFlowController<T> extends FeedRecordDataFlowController<T> {
 
+    protected final IRecordWithMetadataParser<T> dataParser;
+
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
+            IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+        this.dataParser = dataParser;
     }
 
     @Override
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> record) throws HyracksDataException {
-        ((IRecordWithMetadataParser<T>) dataParser).parseMeta(tb.getDataOutput());
+        dataParser.parseMeta(tb.getDataOutput());
         tb.addFieldEndOffset();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 4649559..9b23e38 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -52,8 +52,7 @@ public class StreamRecordReaderFactory implements IRecordReaderFactory<char[]> {
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws HyracksDataException, AlgebricksException {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
         return streamFactory.getPartitionConstraint();
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
index 7995091..964508f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/SocketServerInputStream.java
@@ -23,7 +23,6 @@ import java.io.InputStream;
 import java.net.ServerSocket;
 import java.net.Socket;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.external.api.AsterixInputStream;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.log4j.Logger;
@@ -108,7 +107,7 @@ public class SocketServerInputStream extends AsterixInputStream {
             }
             connectionStream = null;
         } catch (IOException e) {
-            hde = new HyracksDataException(e);
+            hde = HyracksDataException.create(e);
         }
         try {
             if (socket != null) {
@@ -116,14 +115,14 @@ public class SocketServerInputStream extends AsterixInputStream {
             }
             socket = null;
         } catch (IOException e) {
-            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            hde = HyracksDataException.suppress(hde, e);
         }
         try {
             if (server != null) {
                 server.close();
             }
         } catch (IOException e) {
-            hde = ExceptionUtils.suppressIntoHyracksDataException(hde, e);
+            hde = HyracksDataException.suppress(hde, e);
         } finally {
             server = null;
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index fe2d4ec..8a7bda9 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -27,11 +27,13 @@ import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
 /**
@@ -58,12 +60,21 @@ public class FeedIntakeOperatorNodePushable extends ActiveSourceOperatorNodePush
 
     @Override
     protected void start() throws HyracksDataException, InterruptedException {
-        writer.open();
         try {
+            writer.open();
             Thread.currentThread().setName("Intake Thread");
             FeedAdapter adapter = (FeedAdapter) adapterFactory.createAdapter(ctx, partition);
             adapterRuntimeManager = new AdapterRuntimeManager(ctx, runtimeId.getEntityId(), adapter, writer, partition);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+            IFrame message = new VSizeFrame(ctx);
+            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+            /*
+             * Set null feed message. Feed pipeline carries with it a message with each frame
+             * Initially, the message is set to a null message that can be changed by feed adapters.
+             * One use case is adapters which consume data sources that allow restartability. Such adapters
+             * can propagate progress information through the ingestion pipeline to storage nodes
+             */
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
             adapterRuntimeManager.start();
             synchronized (adapterRuntimeManager) {
                 while (!adapterRuntimeManager.isDone()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index a369fe3..78f24a5 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -69,7 +69,6 @@ public class DataflowControllerProvider {
                     IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
-                    boolean sendMarker = ExternalDataUtils.isSendMarker(configuration);
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx,
                                 DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
@@ -83,19 +82,18 @@ public class DataflowControllerProvider {
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader,
-                                        sendMarker);
+                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
-                                        (IRecordWithMetadataParser) dataParser, recordReader, sendMarker);
+                                        (IRecordWithMetadataParser) dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                             return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
-                                    (IRecordWithPKDataParser) dataParser, recordReader, sendMarker);
+                                    (IRecordWithPKDataParser) dataParser, recordReader);
                         } else {
                             return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader, sendMarker);
+                                    recordReader);
                         }
                     } else {
                         return new RecordDataFlowController(ctx,