You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 19:37:01 UTC

[2/4] asterixdb git commit: Add Test NodeController, Test Data Generator, and Marker Logs

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 44aac60..790289a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -32,8 +32,8 @@ public class FeedWithMetaDataFlowController<T> extends FeedRecordDataFlowControl
 
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+            IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 2e1c83f..6159908 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -72,11 +72,12 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
             IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
             throws HyracksDataException {
         this.writer = writer;
+
         this.spiller =
-                new FrameSpiller(ctx,
+                fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
                         connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
                                 + runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition(),
-                        fpa.getMaxSpillOnDisk());
+                        fpa.getMaxSpillOnDisk()) : null;
         this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
         this.fpa = fpa;
         this.framePool = framePool;
@@ -122,7 +123,9 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
         try {
-            spiller.close();
+            if (spiller != null) {
+                spiller.close();
+            }
         } catch (Throwable th) {
             LOGGER.log(Level.WARNING, th.getMessage(), th);
         }
@@ -459,34 +462,34 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
                     frame = inbox.poll();
                     if (frame == null) {
                         // Memory queue is empty. Check spill
-                        frame = spiller.next();
-                        while (frame != null) {
-                            if (consume(frame) != null) {
-                                // We don't release the frame since this is a spill frame that we didn't get from memory
-                                // manager
-                                return;
-                            }
+                        if (spiller != null) {
                             frame = spiller.next();
+                            while (frame != null) {
+                                if (consume(frame) != null) {
+                                    // We don't release the frame since this is a spill frame that we didn't get from memory
+                                    // manager
+                                    return;
+                                }
+                                frame = spiller.next();
+                            }
                         }
                         writer.flush();
                         // At this point. We consumed all memory and spilled
                         // We can't assume the next will be in memory. what if there is 0 memory?
                         synchronized (mutex) {
                             frame = inbox.poll();
-                            if (frame == null) {
-                                // Nothing in memory
-                                if (spiller.switchToMemory()) {
-                                    if (poisoned) {
-                                        break;
-                                    }
-                                    if (DEBUG) {
-                                        LOGGER.info("Consumer is going to sleep");
-                                    }
-                                    // Nothing in disk
-                                    mutex.wait();
-                                    if (DEBUG) {
-                                        LOGGER.info("Consumer is waking up");
-                                    }
+                            // Nothing in memory
+                            if (frame == null && (spiller == null || spiller.switchToMemory())) {
+                                if (poisoned) {
+                                    break;
+                                }
+                                if (DEBUG) {
+                                    LOGGER.info("Consumer is going to sleep");
+                                }
+                                // Nothing in disk
+                                mutex.wait();
+                                if (DEBUG) {
+                                    LOGGER.info("Consumer is waking up");
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index a2f19bb..09e03a3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -28,10 +28,10 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.ArrayDeque;
 
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
 /**
@@ -49,28 +49,30 @@ public class FrameSpiller {
     private final String fileNamePrefix;
     private final ArrayDeque<File> files = new ArrayDeque<>();
     private final VSizeFrame frame;
-    private final int budget;           // Max current frames in disk allowed
-    private BufferedOutputStream bos;   // Current output stream
-    private BufferedInputStream bis;    // Current input stream
-    private File currentWriteFile;      // Current write file
-    private File currentReadFile;       // Current read file
-    private int currentWriteCount = 0;  // Current file write count
-    private int currentReadCount = 0;   // Current file read count
-    private int totalWriteCount = 0;    // Total frames spilled
-    private int totalReadCount = 0;     // Total frames read
-    private int fileCount = 0;          // How many spill files?
+    private final int budget; // Max current frames in disk allowed
+    private BufferedOutputStream bos; // Current output stream
+    private BufferedInputStream bis; // Current input stream
+    private File currentWriteFile; // Current write file
+    private File currentReadFile; // Current read file
+    private int currentWriteCount = 0; // Current file write count
+    private int currentReadCount = 0; // Current file read count
+    private int totalWriteCount = 0; // Total frames spilled
+    private int totalReadCount = 0; // Total frames read
+    private int fileCount = 0; // How many spill files?
 
     public FrameSpiller(IHyracksTaskContext ctx, String fileNamePrefix, long budgetInBytes)
             throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.fileNamePrefix = fileNamePrefix;
-        this.budget = (int) (budgetInBytes / ctx.getInitialFrameSize());
-
+        this.budget = (int) Math.min(budgetInBytes / ctx.getInitialFrameSize(), Integer.MAX_VALUE);
+        if (budget <= 0) {
+            throw new HyracksDataException("Invalid budget " + budgetInBytes + ". Budget must be larger than 0");
+        }
     }
 
     public void open() throws HyracksDataException {
         try {
-            this.currentWriteFile = createFile();
+            this.currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
             this.currentReadFile = currentWriteFile;
             this.bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
             this.bis = new BufferedInputStream(new FileInputStream(currentReadFile));
@@ -135,7 +137,7 @@ public class FrameSpiller {
     }
 
     public double usedBudget() {
-        return ((double) (totalWriteCount - totalReadCount) / (double) budget);
+        return (double) (totalWriteCount - totalReadCount) / (double) budget;
     }
 
     public synchronized boolean spill(ByteBuffer frame) throws HyracksDataException {
@@ -150,7 +152,7 @@ public class FrameSpiller {
             if (currentWriteCount >= FRAMES_PER_FILE) {
                 bos.close();
                 currentWriteCount = 0;
-                currentWriteFile = createFile();
+                currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
                 files.add(currentWriteFile);
                 bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
             }
@@ -161,26 +163,6 @@ public class FrameSpiller {
         }
     }
 
-    private File createFile() throws HyracksDataException {
-        try {
-            String fileName = fileNamePrefix + "_" + fileCount++;
-            File file = new File(fileName);
-            if (!file.exists()) {
-                boolean success = file.createNewFile();
-                if (!success) {
-                    throw new HyracksDataException("Unable to create spill file " + fileName);
-                } else {
-                    if (LOGGER.isEnabledFor(Level.INFO)) {
-                        LOGGER.info("Created spill file " + file.getAbsolutePath());
-                    }
-                }
-            }
-            return file;
-        } catch (Throwable th) {
-            throw new HyracksDataException(th);
-        }
-    }
-
     public synchronized void close() {
         // Do proper cleanup
         if (bos != null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 8ee3e2b..9661890 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -28,6 +28,8 @@ import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
@@ -48,8 +50,9 @@ public class IngestionRuntime extends SubscribableRuntime {
         dWriter.subscribe(collector);
         subscribers.add(collectionRuntime);
         if (numSubscribers == 0) {
-            ctx.setSharedObject(new VSizeFrame(ctx));
-            collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+            TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE,
+                    TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx());
             adapterRuntimeManager.start();
         }
         numSubscribers++;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 54e17ef..37a42a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -40,7 +40,9 @@ import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 /*
@@ -112,7 +114,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
         this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
                 .getApplicationObject()).getFeedManager();
         this.message = new VSizeFrame(ctx);
-        ctx.setSharedObject(message);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.opDesc = feedMetaOperatorDescriptor;
         this.recordDescProvider = recordDescProvider;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 6f679f7..95bebad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -41,7 +41,9 @@ import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -107,7 +109,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
                 .getApplicationObject()).getFeedManager();
         this.targetId = targetId;
         this.message = new VSizeFrame(ctx);
-        ctx.setSharedObject(message);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
         this.recordDescProvider = recordDescProvider;
         this.opDesc = feedMetaOperatorDescriptor;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 4ad08b3..98cb4b0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -59,7 +59,7 @@ public class DataflowControllerProvider {
     public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
@@ -67,6 +67,7 @@ public class DataflowControllerProvider {
                     IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
                     IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+                    boolean sendMarker = ExternalDataUtils.isSendMarker(configuration);
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx,
                                 DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
@@ -80,18 +81,19 @@ public class DataflowControllerProvider {
                             if (isChangeFeed) {
                                 int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
+                                        numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader,
+                                        sendMarker);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
-                                        (IRecordWithMetadataParser) dataParser, recordReader);
+                                        (IRecordWithMetadataParser) dataParser, recordReader, sendMarker);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
                             return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
-                                    (IRecordWithPKDataParser) dataParser, recordReader);
+                                    (IRecordWithPKDataParser) dataParser, recordReader, sendMarker);
                         } else {
                             return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader);
+                                    recordReader, sendMarker);
                         }
                     } else {
                         return new RecordDataFlowController(ctx,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index ad945f2..ed811ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public class DataflowUtils {
     public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
@@ -67,4 +68,14 @@ public class DataflowUtils {
                 throw new HyracksDataException("Unknown tuple forward policy");
         }
     }
+
+    public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer)
+            throws HyracksDataException {
+        if (!appender.append(tuple)) {
+            appender.write(writer, true);
+            if (!appender.append(tuple)) {
+                throw new HyracksDataException("Tuple is too large for a frame");
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 55dee04..e251f32 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -46,6 +46,8 @@ public class ExternalDataConstants {
     public static final String KEY_FILESYSTEM = "fs";
     // specifies the address of the HDFS name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+    // specifies whether a feed sends progress markers or not
+    public static final String KEY_SEND_MARKER = "send-marker";
     // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 19781f9..23cd39c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -332,4 +332,8 @@ public class ExternalDataUtils {
         }
         return intIndicators;
     }
+
+    public static boolean isSendMarker(Map<String, String> configuration) {
+        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 6b7eb31..6e1b9e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -64,6 +64,9 @@ public class FeedUtils {
         SPILL,              // Memory budget has been consumed. Now we're writing to disk
         DISCARD             // Memory and Disk space budgets have been consumed. Now we're discarding
     }
+    
+    private FeedUtils() {
+    }
 
     private static String prepareDataverseFeedName(String dataverseName, String feedName) {
         return dataverseName + File.separator + feedName;
@@ -87,7 +90,7 @@ public class FeedUtils {
             throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
         }
         String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
-        List<FileSplit> splits = new ArrayList<FileSplit>();
+        List<FileSplit> splits = new ArrayList<>();
         for (String nd : locations) {
             splits.add(splitsForAdapter(dataverseName, feedName, nd,
                     AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
@@ -120,6 +123,7 @@ public class FeedUtils {
         int offset = fta.getTupleStartOffset(tc);
         int len = fta.getTupleLength(tc);
         int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize());
+        message.reset();
         message.ensureFrameSize(newSize);
         message.getBuffer().clear();
         message.getBuffer().put(input.array(), offset, len);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index 820ae5f..7a5b722 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -22,6 +22,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
 
 public class CreateDataverseStatement extends Statement {
 
@@ -31,11 +32,7 @@ public class CreateDataverseStatement extends Statement {
 
     public CreateDataverseStatement(Identifier dataverseName, String format, boolean ifNotExists) {
         this.dataverseName = dataverseName;
-        if (format == null) {
-            this.format = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
-        } else {
-            this.format = format;
-        }
+        this.format = (format == null) ? NonTaggedDataFormat.class.getName() : format;
         this.ifNotExists = ifNotExists;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
index 8738c97..000339f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
@@ -58,7 +58,7 @@ public class AInt64SerializerDeserializer implements ISerializerDeserializer<AIn
         return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
                 + (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
                 + (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
-                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+                + (((long) (bytes[offset + 6] & 0xff)) << 8) + (bytes[offset + 7] & 0xff);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index cabfc77..d247490 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -119,8 +119,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
-        Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
-                .getAppContext()).getMetadataProperties().getNodePartitions();
+        Map<String, ClusterPartition[]> nodePartitions =
+                ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
+                        .getNodePartitions();
         Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
         List<Integer> clientsPartitions = new ArrayList<>();
         for (String clientId : nodeReplicationClients) {
@@ -141,8 +142,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         try {
             serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(true);
-            InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP),
-                    dataPort);
+            InetSocketAddress replicationChannelAddress =
+                    new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
             serverSocketChannel.socket().bind(replicationChannelAddress);
             lsmComponentLSNMappingService.start();
             replicationNotifier.start();
@@ -169,8 +170,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         if (remainingFile == 0) {
             if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
                     && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                int remainingIndexes = replicaUniqueLSN2RemoteMapping
-                        .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+                int remainingIndexes =
+                        replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes
+                                .decrementAndGet();
                 if (remainingIndexes == 0) {
                     /**
                      * Note: there is a chance that this will never be removed because some
@@ -216,8 +218,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
-                        inBuffer);
+                ReplicationRequestType replicationFunction =
+                        ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
                         case REPLICATE_LOG:
@@ -281,8 +283,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             Set<Integer> datasetsToForceFlush = new HashSet<>();
             for (IndexInfo iInfo : openIndexesInfo) {
                 if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
-                            .getIOOperationCallback();
+                    AbstractLSMIOOperationCallback ioCallback =
+                            (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
                     //if an index has a pending flush, then the request to flush it will succeed.
                     if (ioCallback.hasPendingFlush()) {
                         //remove index to indicate that it will be flushed
@@ -373,8 +375,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
             Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
-                    .getAppContext()).getMetadataProperties().getNodePartitions();
+            Map<String, ClusterPartition[]> nodePartitions =
+                    ((IAsterixPropertiesProvider) appContextProvider.getAppContext()).getMetadataProperties()
+                            .getNodePartitions();
             for (String replicaId : replicaIds) {
                 //get replica partitions
                 ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 857f1e2..afd6019 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
@@ -41,6 +43,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
@@ -114,8 +117,11 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
         writer.open();
         indexHelper.open();
         index = indexHelper.getIndexInstance();
-
         try {
+            if (ctx.getSharedObject() != null) {
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
+                TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+            }
             missingTupleBuilder = new ArrayTupleBuilder(1);
             DataOutput out = missingTupleBuilder.getDataOutput();
             try {
@@ -135,8 +141,8 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
                     .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAsterixAppRuntimeContext runtimeCtx =
+                    (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
             AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Exception e) {
@@ -241,10 +247,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
                 writeOutput(i, recordWasInserted);
                 i++;
             }
-            if (tupleCount > 0) {
-                // All tuples has to move forward to maintain the correctness of the transaction pipeline
-                appender.write(writer, true);
-            }
+            appender.write(writer, true);
         } catch (IndexException | IOException | AsterixException e) {
             throw new HyracksDataException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 94d2a8c..9a66aa5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -80,8 +80,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
 
     public LogManager(TransactionSubsystem txnSubsystem) {
         this.txnSubsystem = txnSubsystem;
-        logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
-                this.txnSubsystem.getId());
+        logManagerProperties =
+                new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId());
         logFileSize = logManagerProperties.getLogPartitionSize();
         logPageSize = logManagerProperties.getLogPageSize();
         numLogPages = logManagerProperties.getNumLogPages();
@@ -172,6 +172,9 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         if (logRecord.getLogType() == LogType.FLUSH) {
             logRecord.setLSN(appendLSN.get());
         }
+        if (logRecord.isMarker()) {
+            logRecord.logAppended(appendLSN.get());
+        }
         appendLSN.addAndGet(logRecord.getLogSize());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index afb926b..0183b29 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -92,7 +92,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     private final long SHARP_CHECKPOINT_LSN = -1;
     private final boolean replicationEnabled;
     public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
-    private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
+    private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
     private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
     private final long cachedEntityCommitsPerJobSize;
     private final PersistentLocalResourceRepository localResourceRepository;
@@ -108,8 +108,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
         checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
-        IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
-                .getAsterixAppRuntimeContextProvider().getAppContext();
+        IAsterixPropertiesProvider propertiesProvider =
+                (IAsterixPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
         replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
@@ -271,6 +271,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     break;
                 case LogType.FLUSH:
                 case LogType.WAIT:
+                case LogType.MARKER:
                     break;
                 default:
                     throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -361,10 +362,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                 //if index is not registered into IndexLifeCycleManager,
                                 //create the index using LocalMetadata stored in LocalResourceRepository
                                 //get partition path in this node
-                                String partitionIODevicePath = localResourceRepository
-                                        .getPartitionPath(localResource.getPartition());
-                                String resourceAbsolutePath = partitionIODevicePath + File.separator
-                                        + localResource.getResourceName();
+                                String partitionIODevicePath =
+                                        localResourceRepository.getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath =
+                                        partitionIODevicePath + File.separator + localResource.getResourceName();
                                 localResource.setResourcePath(resourceAbsolutePath);
                                 index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                                 if (index == null) {
@@ -379,8 +380,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                     //#. get maxDiskLastLSN
                                     ILSMIndex lsmIndex = index;
                                     try {
-                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
-                                                .getIOOperationCallback())
+                                        maxDiskLastLsn =
+                                                ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
                                                         .getComponentLSN(lsmIndex.getImmutableComponents());
                                     } catch (HyracksDataException e) {
                                         datasetLifecycleManager.close(resourceAbsolutePath);
@@ -405,6 +406,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.UPSERT_ENTITY_COMMIT:
+                    case LogType.MARKER:
                         //do nothing
                         break;
                     default:
@@ -443,8 +445,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         //right after the new checkpoint file is written.
         File[] prevCheckpointFiles = getPreviousCheckpointFiles();
 
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         //flush all in-memory components if it is the sharp checkpoint
         if (isSharpCheckpoint) {
             datasetLifecycleManager.flushAllDatasets();
@@ -467,8 +469,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                         Set<Integer> deadReplicasPartitions = new HashSet<>();
                         //get partitions of the dead replicas that are not active on this node
                         for (String deadReplicaId : deadReplicaIds) {
-                            ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions()
-                                    .get(deadReplicaId);
+                            ClusterPartition[] nodePartitons =
+                                    metadataProperties.getNodePartitions().get(deadReplicaId);
                             for (ClusterPartition partition : nodePartitons) {
                                 if (!localResourceRepository.getActivePartitions()
                                         .contains(partition.getPartitionId())) {
@@ -492,8 +494,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                 datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
                 if (replicationEnabled) {
                     //request remote replicas to flush lagging indexes
-                    IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                            .getAppContext().getReplicationManager();
+                    IReplicationManager replicationManager =
+                            txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
                     try {
                         replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
                     } catch (IOException e) {
@@ -564,16 +566,16 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     @Override
     public long getLocalMinFirstLSN() throws HyracksDataException {
-        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
         List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
         if (openIndexList.size() > 0) {
             for (IIndex index : openIndexList) {
-                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
-                        .getIOOperationCallback();
+                AbstractLSMIOOperationCallback ioCallback =
+                        (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback();
 
                 if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
@@ -585,8 +587,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     }
 
     private long getRemoteMinFirstLSN() {
-        IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getAppContext().getReplicaResourcesManager();
+        IReplicaResourcesManager remoteResourcesManager =
+                txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
         long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
         return minRemoteLSN;
     }
@@ -783,6 +785,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                     case LogType.ABORT:
                     case LogType.FLUSH:
                     case LogType.WAIT:
+                    case LogType.MARKER:
                         //ignore
                         break;
                     default:
@@ -798,8 +801,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             //undo loserTxn's effect
             LOGGER.log(Level.INFO, "undoing loser transaction's effect");
 
-            IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
-                    .getDatasetLifecycleManager();
+            IDatasetLifecycleManager datasetLifecycleManager =
+                    txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass.
             Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
@@ -855,10 +858,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndex index =
+                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceDelete(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
@@ -873,10 +876,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
     private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
         try {
-            ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            ILSMIndex index =
+                    (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+            ILSMIndexAccessor indexAccessor =
+                    index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
                 indexAccessor.forceInsert(logRecord.getNewValue());
             } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index d94f933..19d7afb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -64,6 +64,7 @@ public interface IFrameWriter {
 
     /**
      * Provide data to the stream of this {@link IFrameWriter}.
+     *
      * @param buffer
      *            - Buffer containing data.
      * @throws HyracksDataException
@@ -72,21 +73,24 @@ public interface IFrameWriter {
 
     /**
      * request the frame to push its content forward and flush its consumers
+     *
      * @throws HyracksDataException
      */
     public default void flush() throws HyracksDataException {
-        throw new HyracksDataException("flush() is not supported in this IFrameWriter");
+        // No Op
     }
 
     /**
      * Indicate that a failure was encountered and the current stream is to be
      * aborted.
+     *
      * @throws HyracksDataException
      */
     public void fail() throws HyracksDataException;
 
     /**
      * Close this {@link IFrameWriter} and give up all resources.
+     *
      * @throws HyracksDataException
      */
     public void close() throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 3781489..4eb3ebf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -45,7 +45,7 @@ public interface IHyracksTaskContext
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
 
-    public void setSharedObject(Object sharedObject);
+    public void setSharedObject(Object object);
 
     public Object getSharedObject();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
new file mode 100644
index 0000000..8d55235
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+public class HyracksConstants {
+    public static final String KEY_MESSAGE = "HYX:MSG";
+
+    private HyracksConstants() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..43cac74 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -260,7 +260,8 @@ public class NodeControllerService implements IControllerService {
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        IIPCHandle ccIPCHandle =
+                ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +271,11 @@ public class NodeControllerService implements IControllerService {
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
-        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
-                datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+        ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -490,7 +490,8 @@ public class NodeControllerService implements IControllerService {
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    CCNCFunctions.SendApplicationMessageFunction amf =
+                            (CCNCFunctions.SendApplicationMessageFunction) fn;
                     queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
@@ -515,7 +516,8 @@ public class NodeControllerService implements IControllerService {
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
+                            (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
@@ -528,7 +530,8 @@ public class NodeControllerService implements IControllerService {
                 }
 
                 case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
-                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
+                            (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
                     setNodeControllersInfo(gncirf.getNodeControllerInfos());
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 134154c..f463bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -394,8 +394,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
     }
 
     @Override
-    public void setSharedObject(Object sharedObject) {
-        this.sharedObject = sharedObject;
+    public void setSharedObject(Object object) {
+        this.sharedObject = object;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 7d12296..7d90a8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -106,20 +106,20 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
                                 ByteBuffer buffer = ctx.allocateFrame();
                                 boolean fail = false;
                                 boolean done = false;
-                                boolean flush = false;
                                 while (!fail && !done) {
                                     synchronized (MaterializingPipelinedPartition.this) {
-                                        if (flushRequest) {
-                                            flushRequest = false;
-                                            flush = true;
-                                        }
-                                        while (offset >= size && !eos && !failed && !flush) {
+                                        while (offset >= size && !eos && !failed) {
+                                            if (flushRequest) {
+                                                flushRequest = false;
+                                                writer.flush();
+                                            }
                                             try {
                                                 MaterializingPipelinedPartition.this.wait();
                                             } catch (InterruptedException e) {
                                                 throw new HyracksDataException(e);
                                             }
                                         }
+                                        flushRequest = false;
                                         fail = failed;
                                         done = eos && offset >= size;
                                     }
@@ -134,10 +134,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
                                         offset += readLen;
                                         buffer.flip();
                                         writer.nextFrame(buffer);
-                                        if (flush) {
-                                            writer.flush();
-                                            flush = false;
-                                        }
                                     }
                                 }
                             }
@@ -213,4 +209,4 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
         flushRequest = true;
         notifyAll();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e7131d5..57f8072 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -89,9 +89,7 @@ public class AbstractFrameAppender implements IFrameAppender {
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
         getBuffer().clear();
-        if (getTupleCount() > 0) {
-            outWriter.nextFrame(getBuffer());
-        }
+        outWriter.nextFrame(getBuffer());
         if (clearFrame) {
             frame.reset();
             reset(getBuffer(), true);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index ef11b5b..3ef8b28 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -45,13 +45,13 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
      * append fieldSlots and bytes to the current frame
      */
     @Override
-    public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
-        if (canHoldNewTuple(fieldSlots.length, length)) {
-            for (int i = 0; i < fieldSlots.length; ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots[i]);
+    public boolean append(int[] fieldEndOffsets, byte[] bytes, int offset, int length) throws HyracksDataException {
+        if (canHoldNewTuple(fieldEndOffsets.length, length)) {
+            for (int i = 0; i < fieldEndOffsets.length; ++i) {
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldEndOffsets[i]);
             }
-            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldSlots.length * 4, length);
-            tupleDataEndOffset += fieldSlots.length * 4 + length;
+            System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldEndOffsets.length * 4, length);
+            tupleDataEndOffset += fieldEndOffsets.length * 4 + length;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;
@@ -63,19 +63,24 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
     }
 
     public boolean append(ITupleReference tuple) throws HyracksDataException {
-        int tupleSize = 0;
+        int length = 0;
         for (int i = 0; i < tuple.getFieldCount(); i++) {
-            tupleSize += tuple.getFieldLength(i);
+            length += tuple.getFieldLength(i);
         }
-        if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
-            int offset = 0;
+
+        if (canHoldNewTuple(tuple.getFieldCount(), length)) {
+            length = 0;
+            for (int i = 0; i < tuple.getFieldCount(); ++i) {
+                length += tuple.getFieldLength(i);
+                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, length);
+            }
+            length = 0;
             for (int i = 0; i < tuple.getFieldCount(); ++i) {
-                IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset);
                 System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
-                        tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i));
-                offset += tuple.getFieldLength(i);
+                        tupleDataEndOffset + tuple.getFieldCount() * 4 + length, tuple.getFieldLength(i));
+                length += tuple.getFieldLength(i);
             }
-            tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
+            tupleDataEndOffset += tuple.getFieldCount() * 4 + length;
             IntSerDeUtils.putInt(getBuffer().array(),
                     FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
             ++tupleCount;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index cae659d..7f518cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -26,7 +26,9 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 /**
@@ -39,7 +41,9 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
     private static final int NULL_MESSAGE_SIZE = 1;
     public static final byte NULL_FEED_MESSAGE = 0x01;
     public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
-    public static final byte SNAPSHOT_MESSAGE = 0x03;
+    public static final byte MARKER_MESSAGE = 0x03;
+    private boolean initialized = false;
+    private VSizeFrame message;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
@@ -59,8 +63,8 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
             case ACK_REQ_FEED_MESSAGE:
                 aString.append("Ack Request, ");
                 break;
-            case SNAPSHOT_MESSAGE:
-                aString.append("Snapshot, ");
+            case MARKER_MESSAGE:
+                aString.append("Marker, ");
                 break;
             default:
                 aString.append("Unknown, ");
@@ -78,8 +82,8 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
                 return NULL_FEED_MESSAGE;
             case ACK_REQ_FEED_MESSAGE:
                 return ACK_REQ_FEED_MESSAGE;
-            case SNAPSHOT_MESSAGE:
-                return SNAPSHOT_MESSAGE;
+            case MARKER_MESSAGE:
+                return MARKER_MESSAGE;
             default:
                 throw new HyracksDataException("Unknown message type");
         }
@@ -101,24 +105,35 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+        if (!initialized) {
+            message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+            initialized = true;
+        }
         // If message fits, we append it, otherwise, we append a null message, then send a message only
         // frame with the message
-        ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
-        int messageSize = message.limit() - message.position();
-        if (hasEnoughSpace(1, messageSize)) {
-            appendMessage(message);
-            forward(outWriter);
-        } else {
+        if (message == null) {
             if (tupleCount > 0) {
                 appendNullMessage();
                 forward(outWriter);
             }
-            if (!hasEnoughSpace(1, messageSize)) {
-                frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
-                reset(frame.getBuffer(), true);
+        } else {
+            ByteBuffer buffer = message.getBuffer();
+            int messageSize = buffer.limit() - buffer.position();
+            if (hasEnoughSpace(1, messageSize)) {
+                appendMessage(buffer);
+                forward(outWriter);
+            } else {
+                if (tupleCount > 0) {
+                    appendNullMessage();
+                    forward(outWriter);
+                }
+                if (!hasEnoughSpace(1, messageSize)) {
+                    frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
+                    reset(frame.getBuffer(), true);
+                }
+                appendMessage(buffer);
+                forward(outWriter);
             }
-            appendMessage(message);
-            forward(outWriter);
         }
     }
 
@@ -130,8 +145,9 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
     }
 
     private void appendMessage(ByteBuffer message) {
-        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
-        tupleDataEndOffset += message.limit();
+        int messageLength = message.limit() - message.position();
+        System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, messageLength);
+        tupleDataEndOffset += messageLength;
         IntSerDeUtils.putInt(getBuffer().array(),
                 FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
         ++tupleCount;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
new file mode 100644
index 0000000..4f27d79
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A Utility class for facilitating common operations used with a hyracks task
+ */
+public class TaskUtils {
+    private TaskUtils() {
+    }
+
+    /**
+     * get the shared object of a task as a Map<String,Object>
+     *
+     * @param ctx
+     *            the task context
+     * @param create
+     * @return the task shared map
+     */
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> getSharedMap(IHyracksTaskContext ctx, boolean create) {
+        if (ctx.getSharedObject() != null) {
+            return (Map<String, Object>) ctx.getSharedObject();
+        } else if (create) {
+            Map<String, Object> taskMap = new HashMap<>();
+            ctx.setSharedObject(taskMap);
+            return taskMap;
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * put the key value pair in a map task object
+     *
+     * @param key
+     * @param ctx
+     * @param object
+     */
+    public static void putInSharedMap(String key, Object object, IHyracksTaskContext ctx) {
+        TaskUtils.getSharedMap(ctx, true).put(key, object);
+    }
+
+    /**
+     * get a <T> object from the shared map of the task
+     *
+     * @param key
+     * @param ctx
+     * @return the value associated with the key casted as T
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T get(String key, IHyracksTaskContext ctx) {
+        Map<String, Object> sharedMap = TaskUtils.getSharedMap(ctx, false);
+        return sharedMap == null ? null : (T) sharedMap.get(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 30ee3c0..b4e51be 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -65,8 +65,8 @@ public class PreclusteredGroupWriter implements IFrameWriter {
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
-        this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
-                writer);
+        this.aggregator =
+                aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -91,29 +91,32 @@ public class PreclusteredGroupWriter implements IFrameWriter {
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         inFrameAccessor.reset(buffer);
         int nTuples = inFrameAccessor.getTupleCount();
-        for (int i = 0; i < nTuples; ++i) {
-            if (first) {
+        if (nTuples != 0) {
+            for (int i = 0; i < nTuples; ++i) {
+                if (first) {
 
-                tupleBuilder.reset();
-                for (int j = 0; j < groupFields.length; j++) {
-                    tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
-                }
-                aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+                    tupleBuilder.reset();
+                    for (int j = 0; j < groupFields.length; j++) {
+                        tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+                    }
+                    aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
 
-                first = false;
+                    first = false;
 
-            } else {
-                if (i == 0) {
-                    switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
                 } else {
-                    switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
-                }
+                    if (i == 0) {
+                        switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor,
+                                i);
+                    } else {
+                        switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+                    }
 
+                }
             }
+            copyFrame.ensureFrameSize(buffer.capacity());
+            FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+            copyFrameAccessor.reset(copyFrame.getBuffer());
         }
-        copyFrame.ensureFrameSize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
-        copyFrameAccessor.reset(copyFrame.getBuffer());
     }
 
     private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index 7267cb7..7784199 100644
--- a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -33,4 +33,35 @@
     <root.dir>${basedir}/../../..</root.dir>
   </properties>
 
+  <build>
+      <pluginManagement>
+          <plugins>
+              <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+              <plugin>
+                  <groupId>org.eclipse.m2e</groupId>
+                  <artifactId>lifecycle-mapping</artifactId>
+                  <version>1.0.0</version>
+                  <configuration>
+                      <lifecycleMappingMetadata>
+                          <pluginExecutions>
+                              <pluginExecution>
+                                  <pluginExecutionFilter>
+                                      <groupId>org.apache.maven.plugins</groupId>
+                                      <artifactId>maven-plugin-plugin</artifactId>
+                                      <versionRange>[3.3,)</versionRange>
+                                      <goals>
+                                          <goal>descriptor</goal>
+                                      </goals>
+                                  </pluginExecutionFilter>
+                                  <action>
+                                      <ignore></ignore>
+                                  </action>
+                              </pluginExecution>
+                          </pluginExecutions>
+                      </lifecycleMappingMetadata>
+                  </configuration>
+              </plugin>
+          </plugins>
+      </pluginManagement>
+  </build>
 </project>