You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/03/08 20:53:17 UTC

[2/3] asterixdb git commit: Cleanup and bug fixes in Feeds pipeline

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a89d13e..3b6e7ff 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -41,8 +41,6 @@ public class ExternalDataConstants {
     public static final String KEY_FILESYSTEM = "fs";
     // specifies the address of the HDFS name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
-    // specifies whether a feed sends progress markers or not
-    public static final String KEY_SEND_MARKER = "send-marker";
     // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d009960..a09ff9b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.util;
 
-import java.util.HashMap;
+import java.util.EnumMap;
 import java.util.Map;
 
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -162,7 +162,7 @@ public class ExternalDataUtils {
     private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = initializeValueParserFactoryMap();
 
     private static Map<ATypeTag, IValueParserFactory> initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, IValueParserFactory>();
+        Map<ATypeTag, IValueParserFactory> m = new EnumMap<>(ATypeTag.class);
         m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
         m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
         m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
@@ -339,8 +339,4 @@ public class ExternalDataUtils {
         }
         return intIndicators;
     }
-
-    public static boolean isSendMarker(Map<String, String> configuration) {
-        return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
-    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d286ff9..3863920 100644
--- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -79,12 +79,10 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
                 ITupleForwarder forwarder;
                 ArrayTupleBuilder tb;
                 IPropertiesProvider propertiesProvider =
-                        (IPropertiesProvider) ((NodeControllerService) ctx
-                                .getJobletContext().getApplicationContext().getControllerService())
-                                        .getApplicationContext()
-                                        .getApplicationObject();
-                ClusterPartition nodePartition = propertiesProvider.getMetadataProperties().getNodePartitions()
-                        .get(nodeId)[0];
+                        (IPropertiesProvider) ((NodeControllerService) ctx.getJobletContext().getApplicationContext()
+                                .getControllerService()).getApplicationContext().getApplicationObject();
+                ClusterPartition nodePartition =
+                        propertiesProvider.getMetadataProperties().getNodePartitions().get(nodeId)[0];
                 parser = new ADMDataParser(outputType, true);
                 forwarder = DataflowUtils.getTupleForwarder(configuration,
                         FeedUtils.getFeedLogManager(ctx,
@@ -144,5 +142,4 @@ public class TestTypedAdapterFactory implements IAdapterFactory {
     public ARecordType getMetaType() {
         return null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..6901e1d 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -310,8 +310,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
-        String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName())
-                : dataverse;
+        String dv =
+                dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse;
         if (dv == null) {
             return null;
         }
@@ -353,25 +353,15 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             throws AlgebricksException {
         DataSource source = findDataSource(dataSourceId);
         Dataset dataset = ((DatasetDataSource) source).getDataset();
-        try {
-            String indexName = indexId;
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
-            if (secondaryIndex != null) {
-                return new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
-            } else {
-                Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                        dataset.getDatasetName(), dataset.getDatasetName());
-                if (primaryIndex.getIndexName().equals(indexId)) {
-                    return new DataSourceIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(),
-                            this);
-                } else {
-                    return null;
-                }
-            }
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        String indexName = indexId;
+        Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+        return (secondaryIndex != null)
+                ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this)
+                : null;
+    }
+
+    public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
+        return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
     }
 
     public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
@@ -405,8 +395,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan(
             JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc)
             throws AlgebricksException {
-        ExternalScanOperatorDescriptor dataScanner =
-                new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
         AlgebricksPartitionConstraint constraint;
         try {
             constraint = adapterFactory.getPartitionConstraint();
@@ -462,8 +451,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
             boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields,
-            boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes,
-            int[] maxFilterFieldIndexes) throws AlgebricksException {
+            boolean lowKeyInclusive, boolean highKeyInclusive, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes)
+            throws AlgebricksException {
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
@@ -530,8 +519,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-            spPc = getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(), indexName,
-                    temp);
+            spPc = getSplitProviderAndConstraints(dataset, theIndex.getIndexName());
 
             ISearchOperationCallbackFactory searchCallbackFactory;
             if (isSecondary) {
@@ -586,8 +574,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName,
             int[] keyFields, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes) throws AlgebricksException {
         try {
-            ARecordType recType =
-                    (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+            ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             int numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
 
             boolean temp = dataset.getDatasetDetails().isTemp();
@@ -630,8 +617,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
                     numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(
-                    dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
             ARecordType metaType = null;
             if (dataset.hasMetaPart()) {
                 metaType =
@@ -751,7 +738,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
-            String indexName = primaryIndex.getIndexName();
             ARecordType metaType = dataset.hasMetaPart()
                     ? (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
                     : null;
@@ -763,8 +749,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     itemType, metaType, context.getBinaryComparatorFactoryProvider());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
-                            temp);
+                    getSplitProviderAndConstraints(dataset);
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
 
@@ -779,9 +764,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     new TreeIndexBulkLoadOperatorDescriptor(spec, null, appContext.getStorageManager(),
                             appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                             comparatorFactories, bloomFilterKeyFields, fieldPermutation,
-                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
-                            numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                                    metaType, compactionInfo.first, compactionInfo.second),
+                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR,
+                            false, numElementsHint, true, dataset.getIndexDataflowHelperFactory(this, primaryIndex,
+                                    itemType, metaType, compactionInfo.first, compactionInfo.second),
                             metadataPageManagerFactory);
             return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
         } catch (MetadataException me) {
@@ -951,12 +936,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 numKeyFields / 2);
     }
 
-    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(String dataverseName,
-            String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
-        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
-    }
-
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(String dataverse) {
         return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
     }
@@ -970,8 +949,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             throws MetadataException {
         DatasourceAdapter adapter;
         // search in default namespace (built-in adapter)
-        adapter =
-                MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+        adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
 
         // search in dataverse (user-defined adapter)
         if (adapter == null) {
@@ -985,8 +963,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForFilesIndex(
-            String dataverseName, String datasetName, String targetIdxName, boolean create)
-            throws AlgebricksException {
+            String dataverseName, String datasetName, String targetIdxName, boolean create) throws AlgebricksException {
         return SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, dataverseName, datasetName,
                 targetIdxName, create);
     }
@@ -1104,8 +1081,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
-                            temp);
+                    getSplitProviderAndConstraints(dataset);
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
@@ -1171,7 +1147,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                     comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, null, true, indexName,
                     context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory, null,
-                    metadataPageManagerFactory);
+                    metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             op.setType(itemType);
             op.setFilterIndex(fieldIdx);
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1222,8 +1198,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i),
                             (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
             IAType keyType = keyPairType.first;
-            comparatorFactories[i] =
-                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -1245,8 +1220,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             } catch (AsterixException e) {
                 throw new AlgebricksException(e);
             }
-            comparatorFactories[i] =
-                    BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
+            comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
 
@@ -1295,8 +1269,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     dataset.getDatasetName(), dataset.getDatasetName());
             String indexName = primaryIndex.getIndexName();
             ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName())
-                    .getDatatype();
+                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), dataset.getItemTypeName()).getDatatype();
             ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
             ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
 
@@ -1304,8 +1277,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             IBinaryComparatorFactory[] comparatorFactories = DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
                     itemType, metaItemType, context.getBinaryComparatorFactoryProvider());
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), datasetName, indexName,
-                            temp);
+                    getSplitProviderAndConstraints(dataset);
 
             // prepare callback
             int datasetId = dataset.getDatasetId();
@@ -1462,10 +1434,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             itemType = (ARecordType) MetadataManager.INSTANCE
                     .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
             validateRecordType(itemType);
-            ARecordType metaType = dataset.hasMetaPart()
-                    ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
-                            dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
-                    : null;
+            ARecordType metaType =
+                    dataset.hasMetaPart()
+                            ? (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
+                                    dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()).getDatatype()
+                            : null;
 
             // Index parameters.
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
@@ -1488,8 +1461,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
             IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
             for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), itemType);
+                Pair<IAType, Boolean> keyPairType =
+                        Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), itemType);
                 IAType keyType = keyPairType.first;
                 comparatorFactories[i] =
                         BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true);
@@ -1506,18 +1479,17 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1536,7 +1508,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
                         indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory);
+                        prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
@@ -1648,7 +1620,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     dataset, recType, metaItemType, context.getBinaryComparatorFactoryProvider());
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
             int[] btreeFields = new int[primaryComparatorFactories.length];
             for (int k = 0; k < btreeFields.length; k++) {
                 btreeFields[k] = k + numSecondaryKeys;
@@ -1671,11 +1643,10 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
             int datasetId = dataset.getDatasetId();
             IModificationOperationCallbackFactory modificationCallbackFactory = temp
                     ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE)
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE)
                     : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
+                            modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE,
+                            dataset.hasMetaPart());
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1694,13 +1665,13 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
                         comparatorFactories, null, fieldPermutation, indexDataflowHelperFactory, filterFactory, false,
                         indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory);
+                        prevFieldPermutation, metadataPageManagerFactory, dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory,
-                        filterFactory, false, indexName, null, modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE, metadataPageManagerFactory);
+                        comparatorFactories, null, fieldPermutation, indexOp, indexDataflowHelperFactory, filterFactory,
+                        false, indexName, null, modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                        metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
@@ -1874,7 +1845,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
             IApplicationContextInfo appContext = (IApplicationContextInfo) context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -2055,8 +2026,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                     secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
 
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, indexName,
-                            dataset.getDatasetDetails().isTemp());
+                    getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName());
 
             // Generate Output Record format
             ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -2125,4 +2095,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
     public IStorageComponentProvider getStorageComponentProvider() {
         return storaegComponentProvider;
     }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds)
+            throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), ds.getDatasetName(),
+                ds.getDatasetDetails().isTemp());
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds,
+            String indexName) throws AlgebricksException {
+        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), indexName,
+                ds.getDatasetDetails().isTemp());
+        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..34faf63 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,14 +29,15 @@ import java.util.logging.Logger;
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.JobUtils;
@@ -84,6 +85,7 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
@@ -409,8 +411,8 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             case LENGTH_PARTITIONED_WORD_INVIX:
             case SINGLE_PARTITION_NGRAM_INVIX:
             case SINGLE_PARTITION_WORD_INVIX:
-                return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this,
-                        index, recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
+                return invertedIndexDataflowHelperFactoryProvider.getIndexDataflowHelperFactory(mdProvider, this, index,
+                        recordType, metaType, mergePolicyFactory, mergePolicyProperties, filterTypeTraits,
                         filterCmpFactories);
             default:
                 throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
@@ -577,4 +579,30 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
                 metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
                 datasetPartitions, isSink);
     }
+
+    /**
+     * Get the index dataflow helper factory for the dataset's primary index
+     *
+     * @param mdProvider
+     *            an instance of metadata provider that is used to fetch metadata information
+     * @throws AlgebricksException
+     */
+    public IIndexDataflowHelperFactory getIndexDataflowHelperFactory(MetadataProvider mdProvider)
+            throws AlgebricksException {
+        if (getDatasetType() != DatasetType.INTERNAL) {
+            throw new AlgebricksException(ErrorCode.ASTERIX,
+                    ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, getDatasetType());
+        }
+        Index index = mdProvider.getIndex(getDataverseName(), getDatasetName(), getDatasetName());
+        ARecordType recordType = (ARecordType) mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
+        ARecordType metaType = (ARecordType) mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(this, mdProvider.getMetadataTxnContext());
+        return getIndexDataflowHelperFactory(mdProvider, index, recordType, metaType, compactionInfo.first,
+                compactionInfo.second);
+    }
+
+    public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
+        return NoOpFrameOperationCallbackFactory.INSTANCE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 80792b5..572cc75 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -330,14 +330,12 @@ public class DatasetUtil {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return RuntimeUtils.createJobSpecification();
         }
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
         JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
-                        dataset.getDatasetName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         IIndexDataflowHelperFactory indexDataflowHelperFactory = dataset.getIndexDataflowHelperFactory(
@@ -386,15 +384,12 @@ public class DatasetUtil {
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return RuntimeUtils.createJobSpecification();
         }
-
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         ARecordType metaType = DatasetUtil.getMetaType(metadataProvider, dataset);
         JobSpecification specPrimary = RuntimeUtils.createJobSpecification();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(), dataset.getDatasetName(),
-                        dataset.getDatasetName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
 
@@ -429,7 +424,6 @@ public class DatasetUtil {
         }
         Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
                 datasetName, datasetName);
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         // get meta item type
@@ -451,7 +445,7 @@ public class DatasetUtil {
         int[] btreeFields = DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
 
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
@@ -495,7 +489,6 @@ public class DatasetUtil {
         if (dataset == null) {
             throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
         }
-        boolean temp = dataset.getDatasetDetails().isTemp();
         ARecordType itemType =
                 (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
         ARecordType metaItemType = DatasetUtil.getMetaType(metadataProvider, dataset);
@@ -505,7 +498,7 @@ public class DatasetUtil {
         ITypeTraits[] typeTraits = DatasetUtil.computeTupleTypeTraits(dataset, itemType, metaItemType);
         int[] blooFilterKeyFields = DatasetUtil.createBloomFilterKeyFields(dataset);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataverseName, datasetName, datasetName, temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset);
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 249f035..edaa73e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -452,12 +452,11 @@ public class ExternalIndexingOperations {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
-        boolean temp = ds.getDatasetDetails().isTemp();
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+                metadataProvider.getSplitProviderAndConstraints(ds,
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()));
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
         Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -472,8 +471,7 @@ public class ExternalIndexingOperations {
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
-                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                        metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
                 IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
                         metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
                 treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -499,11 +497,9 @@ public class ExternalIndexingOperations {
                 DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-
-        boolean temp = ds.getDatasetDetails().isTemp();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+                metadataProvider.getSplitProviderAndConstraints(ds,
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()));
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
         Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -518,8 +514,7 @@ public class ExternalIndexingOperations {
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
-                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                        metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
                 IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
                         metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
                 treeDataflowHelperFactories.add(indexDataflowHelperFactory);
@@ -546,11 +541,9 @@ public class ExternalIndexingOperations {
                 DatasetUtil.getMergePolicyFactory(ds, metadataProvider.getMetadataTxnContext());
         ILSMMergePolicyFactory mergePolicyFactory = compactionInfo.first;
         Map<String, String> mergePolicyFactoryProperties = compactionInfo.second;
-        boolean temp = ds.getDatasetDetails().isTemp();
-
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> filesIndexSplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                        IndexingConstants.getFilesIndexName(ds.getDatasetName()), temp);
+                metadataProvider.getSplitProviderAndConstraints(ds,
+                        IndexingConstants.getFilesIndexName(ds.getDatasetName()));
         IFileSplitProvider filesIndexSplitProvider = filesIndexSplitsAndConstraint.first;
         String fileIndexName = BTreeDataflowHelperFactoryProvider.externalFileIndexName(ds);
         Index fileIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
@@ -565,8 +558,7 @@ public class ExternalIndexingOperations {
         for (Index index : indexes) {
             if (isValidIndexName(index.getDatasetName(), index.getIndexName())) {
                 Pair<IFileSplitProvider, AlgebricksPartitionConstraint> indexSplitsAndConstraint =
-                        metadataProvider.getSplitProviderAndConstraints(ds.getDataverseName(), ds.getDatasetName(),
-                                index.getIndexName(), temp);
+                        metadataProvider.getSplitProviderAndConstraints(ds, index.getIndexName());
                 IIndexDataflowHelperFactory indexDataflowHelperFactory = ds.getIndexDataflowHelperFactory(
                         metadataProvider, index, null, null, mergePolicyFactory, mergePolicyFactoryProperties);
                 treeDataflowHelperFactories.add(indexDataflowHelperFactory);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index c6e0a6b..701d0d6 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -101,10 +101,8 @@ public class IndexUtil {
             throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
-        boolean temp = dataset.getDatasetDetails().isTemp();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
-                        index.getIndexName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         ARecordType recordType =
@@ -153,11 +151,9 @@ public class IndexUtil {
     public static JobSpecification buildDropSecondaryIndexJobSpec(Index index, MetadataProvider metadataProvider,
             Dataset dataset) throws AlgebricksException {
         JobSpecification spec = RuntimeUtils.createJobSpecification();
-        boolean temp = dataset.getDatasetDetails().isTemp();
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
-                        index.getIndexName(), temp);
+                metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                 DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
         ARecordType recordType =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index f7e569c..d731603 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -190,8 +190,7 @@ public abstract class SecondaryIndexOperationsHelper {
         metaSerde =
                 metaType == null ? null : SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(index.getDataverseName(), index.getDatasetName(),
-                        index.getIndexName(), dataset.getDatasetDetails().isTemp());
+                metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName());
         secondaryFileSplitProvider = secondarySplitsAndConstraint.first;
         secondaryPartitionConstraint = secondarySplitsAndConstraint.second;
         numPrimaryKeys = DatasetUtil.getPartitioningKeys(dataset).size();
@@ -203,8 +202,7 @@ public abstract class SecondaryIndexOperationsHelper {
                 numFilterFields = 0;
             }
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
-                    metadataProvider.getSplitProviderAndConstraints(dataset.getDataverseName(),
-                            dataset.getDatasetName(), dataset.getDatasetName(), dataset.getDatasetDetails().isTemp());
+                    metadataProvider.getSplitProviderAndConstraints(dataset);
             primaryFileSplitProvider = primarySplitsAndConstraint.first;
             primaryPartitionConstraint = primarySplitsAndConstraint.second;
             setPrimaryRecDescAndComparators();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 2fae304..190a3b2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -71,6 +71,7 @@ import org.apache.hyracks.util.string.UTF8StringWriter;
 public class ARecordPointable extends AbstractPointable {
 
     private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+    public static final ARecordPointableFactory FACTORY = new ARecordPointableFactory();
 
     public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
         private static final long serialVersionUID = 1L;
@@ -86,11 +87,15 @@ public class ARecordPointable extends AbstractPointable {
         }
     };
 
-    public static final IPointableFactory FACTORY = new IPointableFactory() {
+    public static class ARecordPointableFactory implements IPointableFactory {
+
         private static final long serialVersionUID = 1L;
 
+        private ARecordPointableFactory() {
+        }
+
         @Override
-        public IPointable createPointable() {
+        public ARecordPointable createPointable() {
             return new ARecordPointable();
         }
 
@@ -98,7 +103,8 @@ public class ARecordPointable extends AbstractPointable {
         public ITypeTraits getTypeTraits() {
             return TYPE_TRAITS;
         }
-    };
+
+    }
 
     public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() {
         @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index dd7335a..042837b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -22,7 +22,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -65,7 +64,7 @@ public class ReportMaxResourceIdMessage implements IApplicationMessage {
             ((INCMessageBroker) ncs.getApplicationContext().getMessageBroker()).sendMessageToCC(maxResourceIdMsg);
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Unable to report max local resource id", e);
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 2fedcca..8739948 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,7 +20,6 @@ package org.apache.asterix.runtime.message;
 
 import java.util.Set;
 
-import org.apache.asterix.common.exceptions.ExceptionUtils;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.IResourceIdManager;
@@ -57,7 +56,7 @@ public class ResourceIdRequestMessage implements IApplicationMessage {
             }
             broker.sendApplicationMessageToNC(reponse, src);
         } catch (Exception e) {
-            throw ExceptionUtils.convertToHyracksDataException(e);
+            throw HyracksDataException.create(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 6869523..037945a 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -56,6 +56,9 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
@@ -83,13 +86,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
     private final int filterFieldIndex;
     private final int metaFieldIndex;
     private LockThenSearchOperationCallback searchCallback;
+    private IFrameOperationCallback frameOpCallback;
+    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
 
-    public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
-            int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
-            ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
+    public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+            int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
+            ARecordType recordType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory)
+            throws HyracksDataException {
         super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
         this.key = new PermutingFrameTupleReference();
         this.numOfPrimaryKeys = numOfPrimaryKeys;
+        this.frameOpCallbackFactory = frameOpCallbackFactory;
         missingWriter = opDesc.getMissingWriterFactory().createMissingWriter();
         int[] searchKeyPermutations = new int[numOfPrimaryKeys];
         for (int i = 0; i < searchKeyPermutations.length; i++) {
@@ -104,7 +111,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
             isFiltered = true;
             this.recordType = recordType;
             this.presetFieldIndex = filterFieldIndex;
-            this.recPointable = (ARecordPointable) ARecordPointable.FACTORY.createPointable();
+            this.recPointable = ARecordPointable.FACTORY.createPointable();
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
@@ -140,17 +147,19 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-            modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
-                    indexHelper.getResource(), ctx, this);
+            modCallback = opDesc.getModificationOpCallbackFactory()
+                    .createModificationOperationCallback(indexHelper.getResource(), ctx, this);
             searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
                     .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
             indexAccessor = index.createAccessor(modCallback, searchCallback);
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
-            IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) ctx.getJobletContext()
-                    .getApplicationContext().getApplicationObject();
+            IAppRuntimeContext runtimeCtx =
+                    (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
                     runtimeCtx.getTransactionSubsystem().getLogManager());
+            frameOpCallback =
+                    frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
         } catch (Exception e) {
             indexHelper.close();
             throw new HyracksDataException(e);
@@ -188,7 +197,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
         tb.addFieldEndOffset();
     }
 
-    //TODO: use tryDelete/tryInsert in order to prevent deadlocks
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
@@ -221,8 +229,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                     }
                     // if with filters, append the filter
                     if (isFiltered) {
-                        dos.write(prevTuple.getFieldData(filterFieldIndex),
-                                prevTuple.getFieldStart(filterFieldIndex),
+                        dos.write(prevTuple.getFieldData(filterFieldIndex), prevTuple.getFieldStart(filterFieldIndex),
                                 prevTuple.getFieldLength(filterFieldIndex));
                         tb.addFieldEndOffset();
                     }
@@ -258,6 +265,8 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
                 writeOutput(i, recordWasInserted, prevTuple != null);
                 i++;
             }
+            // callback here before calling nextFrame on the next operator
+            frameOpCallback.frameCompleted(!firstModification);
             appender.write(writer, true);
         } catch (IndexException | IOException | AsterixException e) {
             throw new HyracksDataException(e);
@@ -318,6 +327,6 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe
 
     @Override
     public void flush() throws HyracksDataException {
-        writer.flush();
+        // No op since nextFrame flushes by default
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index fe69a04..b37ecae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -37,12 +37,14 @@ import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 
 public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
     private final int[] prevValuePermutation;
+    private final IFrameOperationCallbackFactory frameOpCallbackFactory;
     private ARecordType type;
     private int filterIndex = -1;
 
@@ -54,12 +56,13 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
             boolean isPrimary, String indexName, IMissingWriterFactory missingWriterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackProvider,
             ISearchOperationCallbackFactory searchOpCallbackProvider, int[] prevValuePermutation,
-            IPageManagerFactory pageManagerFactory) {
+            IPageManagerFactory pageManagerFactory, IFrameOperationCallbackFactory frameOpCallbackFactory) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, IndexOperation.UPSERT,
                 dataflowHelperFactory, tupleFilterFactory, isPrimary, indexName, missingWriterFactory,
                 modificationOpCallbackProvider, searchOpCallbackProvider, pageManagerFactory);
         this.prevValuePermutation = prevValuePermutation;
+        this.frameOpCallbackFactory = frameOpCallbackFactory;
     }
 
     @Override
@@ -67,7 +70,7 @@ public class LSMTreeUpsertOperatorDescriptor extends LSMTreeInsertDeleteOperator
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return isPrimary()
                 ? new LSMPrimaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
-                        recordDescProvider, comparatorFactories.length, type, filterIndex)
+                        recordDescProvider, comparatorFactories.length, type, filterIndex, frameOpCallbackFactory)
                 : new LSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
                         recordDescProvider, prevValuePermutation);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 33078ff..90f6bbf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,7 +32,7 @@ import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +46,7 @@ import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
 public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
 
-    private final static long SEED = 0L;
+    protected static final long SEED = 0L;
 
     protected final ITransactionManager transactionManager;
     protected final ILogManager logMgr;
@@ -85,8 +85,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
             logRecord = new LogRecord(callback);
             if (isSink) {
                 return;
@@ -112,6 +111,8 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
                  * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing
                  * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too.
                  */
+                // TODO: Fix this for upserts. an upsert tuple right now expect to notify the opTracker twice (one for
+                // delete and one for insert)
                 transactionContext.notifyOptracker(false);
             } else {
                 tRef.reset(tAccess, t);
@@ -126,7 +127,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
                 }
             }
         }
-        VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+        IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
         if (message != null
                 && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
             try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..cfe2a25 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -29,14 +29,14 @@ public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
-    private final int datasetId;
-    private final int[] primaryKeyFields;
-    private final boolean isTemporaryDatasetWriteJob;
-    private final boolean isWriteTransaction;
-    private final int upsertVarIdx;
-    private int[] datasetPartitions;
-    private final boolean isSink;
+    protected final JobId jobId;
+    protected final int datasetId;
+    protected final int[] primaryKeyFields;
+    protected final boolean isTemporaryDatasetWriteJob;
+    protected final boolean isWriteTransaction;
+    protected final int upsertVarIdx;
+    protected int[] datasetPartitions;
+    protected final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob,
             boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
index 06538af..80dd19b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
@@ -43,6 +43,7 @@ public class VSizeFrame implements IFrame {
         buffer = ctx.allocateFrame(frameSize);
     }
 
+    @Override
     public ByteBuffer getBuffer() {
         return buffer;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 6c581f0..4c0eb1b 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -28,8 +28,10 @@ import org.apache.hyracks.api.util.ErrorMessageUtil;
  */
 public class HyracksDataException extends HyracksException {
 
+    private static final long serialVersionUID = 1L;
+
     public static HyracksDataException create(Throwable cause) {
-        if (cause instanceof HyracksDataException) {
+        if (cause instanceof HyracksDataException || cause == null) {
             return (HyracksDataException) cause;
         }
         return new HyracksDataException(cause);
@@ -48,6 +50,14 @@ public class HyracksDataException extends HyracksException {
                 .getParams());
     }
 
+    public static HyracksDataException suppress(HyracksDataException root, Throwable th) {
+        if (root == null) {
+            return HyracksDataException.create(th);
+        }
+        root.addSuppressed(th);
+        return root;
+    }
+
     public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId,
             Serializable... params) {
         super(component, errorCode, message, cause, nodeId, params);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 21b9dcf..77404b5 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -463,4 +463,8 @@ public class ClusterControllerService implements IControllerService {
             return CCApplicationEntryPoint.INSTANCE;
         }
     }
+
+    public ICCApplicationEntryPoint getApplication() {
+        return aep;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 05417a8..77f18ea 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,6 +25,19 @@ public abstract class AbstractPointable implements IPointable {
 
     protected int length;
 
+    /**
+     * copies the content of this pointable to the passed byte array.
+     * the array is expected to be at least of length = length of this pointable
+     *
+     * @param copy
+     *            the array to write into
+     * @throws ArrayIndexOutOfBoundsException
+     *             if the passed array size is smaller than length
+     */
+    public void copyInto(byte[] copy) {
+        System.arraycopy(bytes, start, copy, 0, length);
+    }
+
     @Override
     public void set(byte[] bytes, int start, int length) {
         this.bytes = bytes;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
index 74ced4f..2e8071c 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
@@ -18,8 +18,27 @@
  */
 package org.apache.hyracks.data.std.api;
 
+/**
+ * Point to range over byte array
+ */
 public interface IPointable extends IValueReference {
-    public void set(byte[] bytes, int start, int length);
+    /**
+     * Point to the range from position = start with length = length over the byte array bytes
+     *
+     * @param bytes
+     *            the byte array
+     * @param start
+     *            the start offset
+     * @param length
+     *            the length of the range
+     */
+    void set(byte[] bytes, int start, int length);
 
-    public void set(IValueReference pointer);
+    /**
+     * Point to the same range pointed to by the passed pointer
+     *
+     * @param pointer
+     *            the pointer to the targetted range
+     */
+    void set(IValueReference pointer);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
index ee00163..51c155e 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
@@ -20,10 +20,10 @@ package org.apache.hyracks.data.std.primitive;
 
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.data.std.api.AbstractPointable;
-import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IPointableFactory;
 
 public final class VoidPointable extends AbstractPointable {
+    public static final VoidPointableFactory FACTORY = new VoidPointableFactory();
     public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
         private static final long serialVersionUID = 1L;
 
@@ -38,11 +38,14 @@ public final class VoidPointable extends AbstractPointable {
         }
     };
 
-    public static final IPointableFactory FACTORY = new IPointableFactory() {
+    public static class VoidPointableFactory implements IPointableFactory {
         private static final long serialVersionUID = 1L;
 
+        private VoidPointableFactory() {
+        }
+
         @Override
-        public IPointable createPointable() {
+        public VoidPointable createPointable() {
             return new VoidPointable();
         }
 
@@ -50,5 +53,5 @@ public final class VoidPointable extends AbstractPointable {
         public ITypeTraits getTypeTraits() {
             return TYPE_TRAITS;
         }
-    };
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 57f8072..efdd963 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -108,4 +108,11 @@ public class AbstractFrameAppender implements IFrameAppender {
         return false;
     }
 
+    @Override
+    public void flush(IFrameWriter writer) throws HyracksDataException {
+        if (tupleCount > 0) {
+            write(writer, true);
+        }
+        writer.flush();
+    }
 }