You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 19:37:03 UTC

[4/4] asterixdb git commit: Add Test NodeController, Test Data Generator, and Marker Logs

Add Test NodeController, Test Data Generator, and Marker Logs

This test enable creating a node controller for unit test purposes.
The Node controller is identical to the regular node controller
except that it doesn't communicate with a cluster controller at all.

In this change, Test Data Generator is introduced which should
facilitate writing unit test cases which requires data generation.
The change also includes enabling feeds to send progress data. progress
information can then be sent through the pipeline and persisted in the
transaction logs and primary index component. A Unit test case has
been created to test adding progress markers to logs and index
components and then reading them.

The last part of this change is the addition of marker logs and their
callbacks. They enable components to create arbitrary logs and get a
callback when they are written to the transaction logs. Initial set of
unit tests were added for marker logs.

Change-Id: I3b9aa8de758b7d26ca34868b16e5ce693e0c0243
Reviewed-on: https://asterix-gerrit.ics.uci.edu/962
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7575785a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7575785a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7575785a

Branch: refs/heads/master
Commit: 7575785accf54f0a1bf0857c9add6365f44d386e
Parents: 973a0d3
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Fri Jul 22 20:52:47 2016 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Jul 22 12:35:47 2016 -0700

----------------------------------------------------------------------
 .../operators/physical/CommitRuntime.java       |  37 +-
 .../app/bootstrap/TestNodeController.java       | 481 +++++++++++++++++++
 .../data/gen/ABooleanFieldValueGenerator.java   |  94 ++++
 .../data/gen/ADoubleFieldValueGenerator.java    | 153 ++++++
 .../app/data/gen/AInt32FieldValueGenerator.java | 152 ++++++
 .../app/data/gen/AInt64FieldValueGenerator.java | 152 ++++++
 .../app/data/gen/ARecordValueGenerator.java     | 119 +++++
 .../data/gen/AStringFieldValueGenerator.java    | 159 ++++++
 .../data/gen/IAsterixFieldValueGenerator.java   |  48 ++
 .../data/gen/TestTupleCounterFrameWriter.java   |  51 ++
 .../asterix/app/data/gen/TupleGenerator.java    | 126 +++++
 .../asterix/test/common/TestTupleGenerator.java |  29 --
 .../asterix/test/common/TestTupleReference.java |  74 +++
 .../ConnectorDescriptorWithMessagingTest.java   |  18 +-
 .../asterix/test/dataflow/LogMarkerTest.java    | 211 ++++++++
 .../dataflow/TestRecordDescriptorFactory.java   |  28 --
 asterixdb/asterix-common/pom.xml                |   5 +
 .../context/PrimaryIndexOperationTracker.java   |  10 +-
 ...erixLSMInsertDeleteOperatorNodePushable.java |  17 +-
 .../common/transactions/ILogMarkerCallback.java |  45 ++
 .../asterix/common/transactions/ILogRecord.java |  49 +-
 .../asterix/common/transactions/LogRecord.java  | 239 +++++----
 .../asterix/common/transactions/LogType.java    |   9 +-
 .../PrimaryIndexLogMarkerCallback.java          |  51 ++
 .../asterix/common/utils/AsterixConstants.java  |  30 ++
 .../apache/asterix/common/utils/FrameStack.java | 149 ++++++
 .../asterix/common/utils/StoragePathUtil.java   |  39 ++
 .../asterix/common/utils/TransactionUtil.java   |  16 +
 .../asterix/event/util/AsterixConstants.java    |  25 -
 .../asterix/event/util/PatternCreator.java      |  17 +-
 .../asterix/external/api/IFeedMarker.java       |  35 ++
 .../asterix/external/api/IRecordConverter.java  |   1 +
 .../asterix/external/api/IRecordReader.java     |   4 +
 .../dataflow/ChangeFeedDataFlowController.java  |   4 +-
 .../ChangeFeedWithMetaDataFlowController.java   |   6 +-
 .../dataflow/FeedRecordDataFlowController.java  |  99 +++-
 .../external/dataflow/FeedTupleForwarder.java   |   4 +-
 .../FeedWithMetaDataFlowController.java         |   4 +-
 .../feed/dataflow/FeedRuntimeInputHandler.java  |  51 +-
 .../external/feed/dataflow/FrameSpiller.java    |  54 +--
 .../external/feed/runtime/IngestionRuntime.java |   7 +-
 .../operators/FeedMetaComputeNodePushable.java  |   4 +-
 .../operators/FeedMetaStoreNodePushable.java    |   4 +-
 .../provider/DataflowControllerProvider.java    |  12 +-
 .../asterix/external/util/DataflowUtils.java    |  11 +
 .../external/util/ExternalDataConstants.java    |   2 +
 .../external/util/ExternalDataUtils.java        |   4 +
 .../apache/asterix/external/util/FeedUtils.java |   6 +-
 .../statement/CreateDataverseStatement.java     |   7 +-
 .../serde/AInt64SerializerDeserializer.java     |   2 +-
 .../management/ReplicationChannel.java          |  27 +-
 ...rixLSMPrimaryUpsertOperatorNodePushable.java |  17 +-
 .../management/service/logging/LogManager.java  |   7 +-
 .../service/recovery/RecoveryManager.java       |  65 +--
 .../apache/hyracks/api/comm/IFrameWriter.java   |   6 +-
 .../api/context/IHyracksTaskContext.java        |   2 +-
 .../hyracks/api/util/HyracksConstants.java      |  26 +
 .../control/nc/NodeControllerService.java       |  23 +-
 .../org/apache/hyracks/control/nc/Task.java     |   4 +-
 .../MaterializingPipelinedPartition.java        |  18 +-
 .../common/comm/io/AbstractFrameAppender.java   |   4 +-
 .../common/comm/io/FrameTupleAppender.java      |  33 +-
 .../common/io/MessagingFrameTupleAppender.java  |  52 +-
 .../hyracks/dataflow/common/util/TaskUtils.java |  77 +++
 .../preclustered/PreclusteredGroupWriter.java   |  39 +-
 .../hyracks-virtualcluster-maven-plugin/pom.xml |  31 ++
 .../am/common/api/IMetaDataPageManager.java     |  43 +-
 .../am/common/api/ITreeIndexMetaDataFrame.java  |   5 +
 .../IndexSearchOperatorNodePushable.java        |   4 +-
 .../am/common/frames/LIFOMetaDataFrame.java     |  90 ++--
 .../freepage/LinkedMetaDataPageManager.java     |  26 +-
 .../storage/am/lsm/btree/impls/LSMBTree.java    |  81 ++--
 .../lsm/btree/impls/LSMBTreeDiskComponent.java  |   6 +
 .../lsm/btree/impls/LSMBTreeFlushOperation.java |   8 +-
 .../btree/impls/LSMBTreeMemoryComponent.java    |   9 +-
 .../am/lsm/common/api/ILSMComponent.java        |   4 +
 .../freepage/VirtualMetaDataPageManager.java    |  10 +
 .../lsm/common/impls/AbstractLSMComponent.java  |  18 +-
 .../am/lsm/common/impls/AbstractLSMIndex.java   |   8 +-
 .../impls/AbstractMemoryLSMComponent.java       |   9 +-
 .../storage/am/lsm/common/impls/LSMHarness.java |   4 +
 .../lsm/common/impls/LSMTreeIndexAccessor.java  |   3 +-
 .../hyracks/test/support/TestTaskContext.java   |   9 +-
 83 files changed, 3194 insertions(+), 528 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
index 0f31935..c6c71f6 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.JobId;
@@ -32,12 +33,16 @@ import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
 import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit;
 
 public class CommitRuntime implements IPushRuntime {
@@ -52,17 +57,18 @@ public class CommitRuntime implements IPushRuntime {
     protected final boolean isTemporaryDatasetWriteJob;
     protected final boolean isWriteTransaction;
     protected final long[] longHashes;
-    protected final LogRecord logRecord;
     protected final FrameTupleReference frameTupleReference;
-
+    protected final IHyracksTaskContext ctx;
+    protected final int resourcePartition;
     protected ITransactionContext transactionContext;
+    protected LogRecord logRecord;
     protected FrameTupleAccessor frameTupleAccessor;
-    protected final int resourcePartition;
 
     public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition) {
-        IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
+        this.ctx = ctx;
+        IAsterixAppRuntimeContext runtimeCtx =
+                (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
         this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
         this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager();
         this.jobId = jobId;
@@ -73,7 +79,6 @@ public class CommitRuntime implements IPushRuntime {
         this.isWriteTransaction = isWriteTransaction;
         this.resourcePartition = resourcePartition;
         longHashes = new long[2];
-        logRecord = new LogRecord();
     }
 
     @Override
@@ -81,6 +86,9 @@ public class CommitRuntime implements IPushRuntime {
         try {
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
+            ILogMarkerCallback callback =
+                    TaskUtils.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            logRecord = new LogRecord(callback);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
@@ -113,6 +121,23 @@ public class CommitRuntime implements IPushRuntime {
                 }
             }
         }
+        VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+        if (message != null
+                && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
+            try {
+                formMarkerLogRecords(message.getBuffer());
+                logMgr.log(logRecord);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
+            message.reset();
+            message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
+        }
+    }
+
+    private void formMarkerLogRecords(ByteBuffer marker) {
+        TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker);
     }
 
     protected void formLogRecord(ByteBuffer buffer, int t) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
new file mode 100644
index 0000000..5c3aefe
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.bootstrap;
+
+import java.io.File;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.algebra.operators.physical.CommitRuntime;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.app.external.TestLibrarian;
+import org.apache.asterix.common.api.ILocalResourceMetadata;
+import org.apache.asterix.common.config.AsterixTransactionProperties;
+import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.dataflow.AsterixLSMTreeInsertDeleteOperatorDescriptor;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
+import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.AqlTypeTraitProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtils;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
+import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadata;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
+import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelperFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.apache.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import org.apache.hyracks.storage.common.file.LocalResource;
+import org.apache.hyracks.test.support.TestUtils;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestNodeController {
+    protected static final Logger LOGGER = Logger.getLogger(TestNodeController.class.getName());
+
+    protected static final String PATH_ACTUAL = "unittest" + File.separator;
+    protected static final String PATH_BASE =
+            StringUtils.join(new String[] { "src", "test", "resources", "nodetests" }, File.separator);
+
+    protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+    protected static AsterixTransactionProperties txnProperties;
+    private static final boolean cleanupOnStart = true;
+    private static final boolean cleanupOnStop = true;
+
+    // Constants
+    public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
+    public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
+    public static final int KB32 = 32768;
+    public static final int PARTITION = 0;
+    public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
+    public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = new TransactionSubsystemProvider();
+    // Mutables
+    private JobId jobId;
+    private long jobCounter = 0L;
+    private IHyracksJobletContext jobletCtx;
+
+    public TestNodeController() throws AsterixException, HyracksException, ACIDException {
+    }
+
+    public void init() throws Exception {
+        try {
+            File outdir = new File(PATH_ACTUAL);
+            outdir.mkdirs();
+            // remove library directory
+            TestLibrarian.removeLibraryDir();
+            ExecutionTestUtil.setUp(cleanupOnStart);
+        } catch (Throwable th) {
+            th.printStackTrace();
+            throw th;
+        }
+        jobletCtx = Mockito.mock(IHyracksJobletContext.class);
+        Mockito.when(jobletCtx.getApplicationContext())
+                .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext());
+        Mockito.when(jobletCtx.getJobId()).thenAnswer(new Answer<JobId>() {
+            @Override
+            public JobId answer(InvocationOnMock invocation) throws Throwable {
+                return jobId;
+            }
+        });
+    }
+
+    public void deInit() throws Exception {
+        TestLibrarian.removeLibraryDir();
+        ExecutionTestUtil.tearDown(cleanupOnStop);
+    }
+
+    public org.apache.asterix.common.transactions.JobId getTxnJobId() {
+        return new org.apache.asterix.common.transactions.JobId((int) jobId.getId());
+    }
+
+    public AsterixLSMInsertDeleteOperatorNodePushable getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        IndexOperation op = IndexOperation.INSERT;
+        IModificationOperationCallbackFactory modOpCallbackFactory =
+                new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(), dataset.getDatasetId(),
+                        primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
+                getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory =
+                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        Mockito.when(indexOpDesc.getIndexDataflowHelperFactory()).thenReturn(dataflowHelperFactory);
+        IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
+        AsterixLSMInsertDeleteOperatorNodePushable insertOp =
+                new AsterixLSMInsertDeleteOperatorNodePushable(indexOpDesc, ctx, PARTITION,
+                        primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDescProvider, op, true);
+        CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(), dataset.getDatasetId(),
+                primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION);
+        insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
+        commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
+        return insertOp;
+    }
+
+    public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+            NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws HyracksDataException, AlgebricksException {
+        IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+        JobSpecification spec = new JobSpecification();
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        LSMBTreeDataflowHelperFactory indexDataflowHelperFactory =
+                getPrimaryIndexDataflowHelperFactory(ctx, primaryIndexInfo);
+        BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc,
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
+                primaryIndexInfo.fileSplitProvider, primaryIndexInfo.primaryIndexTypeTraits,
+                primaryIndexInfo.primaryIndexComparatorFactories, primaryIndexInfo.primaryIndexBloomFilterKeyFields,
+                primaryIndexInfo.primaryKeyIndexes, primaryIndexInfo.primaryKeyIndexes, true, true,
+                indexDataflowHelperFactory, false, false, null, NoOpOperationCallbackFactory.INSTANCE, filterFields,
+                filterFields);
+        BTreeSearchOperatorNodePushable searchOp = new BTreeSearchOperatorNodePushable(searchOpDesc, ctx, 0,
+                primaryIndexInfo.getSearchRecordDescriptorProvider(), /*primaryIndexInfo.primaryKeyIndexes*/null,
+                /*primaryIndexInfo.primaryKeyIndexes*/null, true, true, filterFields, filterFields);
+        emptyTupleOp.setFrameWriter(0, searchOp,
+                primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0));
+        searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc);
+        return emptyTupleOp;
+    }
+
+    public LogReader getTransactionLogReader(boolean isRecoveryMode) {
+        return (LogReader) getTransactionSubsystem().getLogManager().getLogReader(isRecoveryMode);
+    }
+
+    public JobId newJobId() {
+        jobId = new JobId(jobCounter++);
+        return jobId;
+    }
+
+    public AsterixLSMTreeInsertDeleteOperatorDescriptor getInsertOpratorDesc(PrimaryIndexInfo primaryIndexInfo,
+            IModificationOperationCallbackFactory modOpCallbackFactory) {
+        AsterixLSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
+                Mockito.mock(AsterixLSMTreeInsertDeleteOperatorDescriptor.class);
+        Mockito.when(indexOpDesc.getLifecycleManagerProvider())
+                .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        Mockito.when(indexOpDesc.getModificationOpCallbackFactory()).thenReturn(modOpCallbackFactory);
+        return indexOpDesc;
+    }
+
+    public TreeIndexCreateOperatorDescriptor getIndexCreateOpDesc(PrimaryIndexInfo primaryIndexInfo) {
+        TreeIndexCreateOperatorDescriptor indexOpDesc = Mockito.mock(TreeIndexCreateOperatorDescriptor.class);
+        Mockito.when(indexOpDesc.getLifecycleManagerProvider())
+                .thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getStorageManager()).thenReturn(AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER);
+        Mockito.when(indexOpDesc.getFileSplitProvider()).thenReturn(primaryIndexInfo.fileSplitProvider);
+        Mockito.when(indexOpDesc.getLocalResourceFactoryProvider())
+                .thenReturn(primaryIndexInfo.localResourceFactoryProvider);
+        Mockito.when(indexOpDesc.getTreeIndexTypeTraits()).thenReturn(primaryIndexInfo.primaryIndexTypeTraits);
+        Mockito.when(indexOpDesc.getTreeIndexComparatorFactories())
+                .thenReturn(primaryIndexInfo.primaryIndexComparatorFactories);
+        Mockito.when(indexOpDesc.getTreeIndexBloomFilterKeyFields())
+                .thenReturn(primaryIndexInfo.primaryIndexBloomFilterKeyFields);
+        return indexOpDesc;
+    }
+
+    public ConstantFileSplitProvider getFileSplitProvider(Dataset dataset) {
+        FileSplit fileSplit = new FileSplit(AsterixHyracksIntegrationUtil.ncs[0].getId(),
+                dataset.getDataverseName() + File.separator + dataset.getDatasetName());
+        return new ConstantFileSplitProvider(new FileSplit[] { fileSplit });
+    }
+
+    public ILocalResourceFactoryProvider getPrimaryIndexLocalResourceMetadataProvider(Dataset dataset,
+            ITypeTraits[] primaryIndexTypeTraits, IBinaryComparatorFactory[] primaryIndexComparatorFactories,
+            int[] primaryIndexBloomFilterKeyFields, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
+            IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields) {
+        ILocalResourceMetadata localResourceMetadata =
+                new LSMBTreeLocalResourceMetadata(primaryIndexTypeTraits, primaryIndexComparatorFactories,
+                        primaryIndexBloomFilterKeyFields, true, dataset.getDatasetId(), mergePolicyFactory,
+                        mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+        ILocalResourceFactoryProvider localResourceFactoryProvider =
+                new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+        return localResourceFactoryProvider;
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo, TreeIndexCreateOperatorDescriptor indexOpDesc)
+            throws AlgebricksException {
+        LSMBTreeDataflowHelperFactory dataflowHelperFactory = new LSMBTreeDataflowHelperFactory(
+                new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties,
+                new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+        IndexDataflowHelper dataflowHelper =
+                dataflowHelperFactory.createIndexDataflowHelper(indexOpDesc, ctx, PARTITION);
+        return (LSMBTreeDataflowHelper) dataflowHelper;
+    }
+
+    public LSMBTreeDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(IHyracksTaskContext ctx,
+            PrimaryIndexInfo primaryIndexInfo) throws AlgebricksException {
+        return new LSMBTreeDataflowHelperFactory(
+                new AsterixVirtualBufferCacheProvider(primaryIndexInfo.dataset.getDatasetId()),
+                primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties,
+                new PrimaryIndexOperationTrackerProvider(primaryIndexInfo.dataset.getDatasetId()),
+                AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
+                BLOOM_FILTER_FALSE_POSITIVE_RATE, true, primaryIndexInfo.filterTypeTraits,
+                primaryIndexInfo.filterCmpFactories, primaryIndexInfo.btreeFields, primaryIndexInfo.filterFields, true);
+    }
+
+    public LSMBTreeDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
+            Map<String, String> mergePolicyProperties, int[] filterFields)
+            throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
+        return getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+    }
+
+    public void createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
+            ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+            int[] filterFields) throws AlgebricksException, HyracksDataException {
+        PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType,
+                mergePolicyFactory, mergePolicyProperties, filterFields);
+        TreeIndexCreateOperatorDescriptor indexOpDesc = getIndexCreateOpDesc(primaryIndexInfo);
+        LSMBTreeDataflowHelper dataflowHelper =
+                getPrimaryIndexDataflowHelper(createTestContext(), primaryIndexInfo, indexOpDesc);
+        dataflowHelper.create();
+    }
+
+    private int[] createPrimaryIndexBloomFilterFields(int length) {
+        int[] primaryIndexBloomFilterKeyFields = new int[length];
+        for (int j = 0; j < length; ++j) {
+            primaryIndexBloomFilterKeyFields[j] = j;
+        }
+        return primaryIndexBloomFilterKeyFields;
+    }
+
+    private IBinaryComparatorFactory[] createPrimaryIndexComparatorFactories(IAType[] primaryKeyTypes) {
+        IBinaryComparatorFactory[] primaryIndexComparatorFactories =
+                new IBinaryComparatorFactory[primaryKeyTypes.length];
+        for (int j = 0; j < primaryKeyTypes.length; ++j) {
+            primaryIndexComparatorFactories[j] =
+                    AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(primaryKeyTypes[j], true);
+        }
+        return primaryIndexComparatorFactories;
+    }
+
+    private ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields,
+            IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) {
+        int i = 0;
+        ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields];
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexSerdes[i] =
+                    AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+        }
+        primaryIndexSerdes[i++] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType);
+        if (metaType != null) {
+            primaryIndexSerdes[i] = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
+        }
+        return primaryIndexSerdes;
+    }
+
+    private ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes,
+            ARecordType recordType, ARecordType metaType) {
+        ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields];
+        int i = 0;
+        for (; i < primaryKeyTypes.length; i++) {
+            primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+        }
+        primaryIndexTypeTraits[i++] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(recordType);
+        if (metaType != null) {
+            primaryIndexTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(metaType);
+        }
+        return primaryIndexTypeTraits;
+    }
+
+    public IHyracksTaskContext createTestContext() throws HyracksDataException {
+        IHyracksTaskContext ctx = TestUtils.create(KB32);
+        TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+        ctx = Mockito.spy(ctx);
+        Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
+        Mockito.when(ctx.getIOManager())
+                .thenReturn(AsterixHyracksIntegrationUtil.ncs[0].getRootContext().getIOManager());
+        return ctx;
+    }
+
+    public TransactionSubsystem getTransactionSubsystem() {
+        return (TransactionSubsystem) ((AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0]
+                .getApplicationContext().getApplicationObject()).getTransactionSubsystem();
+    }
+
+    public ITransactionManager getTransactionManager() {
+        return getTransactionSubsystem().getTransactionManager();
+    }
+
+    public AsterixAppRuntimeContext getAppRuntimeContext() {
+        return (AsterixAppRuntimeContext) AsterixHyracksIntegrationUtil.ncs[0].getApplicationContext()
+                .getApplicationObject();
+    }
+
+    public DatasetLifecycleManager getDatasetLifecycleManager() {
+        return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager();
+    }
+
+    @SuppressWarnings("unused")
+    private class PrimaryIndexInfo {
+        private Dataset dataset;
+        private IAType[] primaryKeyTypes;
+        private ARecordType recordType;
+        private ARecordType metaType;
+        private ILSMMergePolicyFactory mergePolicyFactory;
+        private Map<String, String> mergePolicyProperties;
+        private int[] filterFields;
+        private int primaryIndexNumOfTupleFields;
+        private IBinaryComparatorFactory[] primaryIndexComparatorFactories;
+        private ITypeTraits[] primaryIndexTypeTraits;
+        private ISerializerDeserializer<?>[] primaryIndexSerdes;
+        private int[] primaryIndexBloomFilterKeyFields;
+        private ITypeTraits[] filterTypeTraits;
+        private IBinaryComparatorFactory[] filterCmpFactories;
+        private int[] btreeFields;
+        private ILocalResourceFactoryProvider localResourceFactoryProvider;
+        private ConstantFileSplitProvider fileSplitProvider;
+        private RecordDescriptor rDesc;
+        private int[] primaryIndexInsertFieldsPermutations;
+        private int[] primaryKeyIndexes;
+
+        public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType,
+                ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties,
+                int[] filterFields) throws AlgebricksException {
+            this.dataset = dataset;
+            this.primaryKeyTypes = primaryKeyTypes;
+            this.recordType = recordType;
+            this.metaType = metaType;
+            this.mergePolicyFactory = mergePolicyFactory;
+            this.mergePolicyProperties = mergePolicyProperties;
+            this.filterFields = filterFields;
+            primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+            primaryIndexTypeTraits =
+                    createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            primaryIndexComparatorFactories = createPrimaryIndexComparatorFactories(primaryKeyTypes);
+            primaryIndexBloomFilterKeyFields = createPrimaryIndexBloomFilterFields(primaryKeyTypes.length);
+            filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recordType);
+            filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recordType,
+                    NonTaggedDataFormat.INSTANCE.getBinaryComparatorFactoryProvider());
+            btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
+            localResourceFactoryProvider = getPrimaryIndexLocalResourceMetadataProvider(dataset, primaryIndexTypeTraits,
+                    primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields, mergePolicyFactory,
+                    mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+            fileSplitProvider = getFileSplitProvider(dataset);
+            primaryIndexSerdes =
+                    createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
+            primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields];
+            for (int i = 0; i < primaryIndexNumOfTupleFields; i++) {
+                primaryIndexInsertFieldsPermutations[i] = i;
+            }
+            primaryKeyIndexes = new int[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyIndexes.length; i++) {
+                primaryKeyIndexes[i] = i;
+            }
+        }
+
+        public IRecordDescriptorProvider getInsertRecordDescriptorProvider() {
+            IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+            return rDescProvider;
+        }
+
+        public IRecordDescriptorProvider getSearchRecordDescriptorProvider() {
+            ITypeTraits[] primaryKeyTypeTraits = new ITypeTraits[primaryKeyTypes.length];
+            ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyTypes.length; i++) {
+                primaryKeyTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]);
+                primaryKeySerdes[i] =
+                        AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]);
+            }
+            RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits);
+            IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+            Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt()))
+                    .thenReturn(searcgRecDesc);
+            return rDescProvider;
+        }
+    }
+
+    public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) {
+        int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1));
+        ITypeTraits[] primaryIndexTypeTraits =
+                createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        ISerializerDeserializer<?>[] primaryIndexSerdes =
+                createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType);
+        return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
new file mode 100644
index 0000000..2eba473
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ABooleanFieldValueGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ABooleanFieldValueGenerator implements IAsterixFieldValueGenerator<Boolean> {
+    private final GenerationFunction generationFunction;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private boolean value;
+
+    public ABooleanFieldValueGenerator(GenerationFunction generationFunction, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = true;
+                break;
+            case DETERMINISTIC:
+                value = false;
+                break;
+            case INCREASING:
+                value = false;
+                break;
+            case RANDOM:
+                value = rand.nextBoolean();
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+        }
+        generate();
+        out.writeBoolean(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DETERMINISTIC:
+                value = !value;
+                break;
+            case RANDOM:
+                value = rand.nextBoolean();
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public Boolean next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+        }
+        out.writeBoolean(value);
+    }
+
+    @Override
+    public Boolean get() throws IOException {
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
new file mode 100644
index 0000000..e698676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ADoubleFieldValueGenerator.java
@@ -0,0 +1,153 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ADoubleFieldValueGenerator implements IAsterixFieldValueGenerator<Double> {
+    private static final double START = 1000000000.0;
+    private static final int BATCH_SIZE = 1000;
+    private static final double INCREMENT = 0.1;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private double value;
+    private int cycle;
+    private List<Double> uniques;
+    private Iterator<Double> iterator;
+
+    public ADoubleFieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    double lowerBound = START;
+                    double upperBound = lowerBound + (BATCH_SIZE * INCREMENT);
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound += INCREMENT;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        generate();
+        out.writeDouble(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value -= INCREMENT;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - (cycle * INCREMENT);
+                } else {
+                    value = START + (cycle * INCREMENT);
+                }
+                break;
+            case INCREASING:
+                value += INCREMENT;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        double lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * (BATCH_SIZE * INCREMENT));
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * (BATCH_SIZE * INCREMENT));
+                        }
+                        double upperBound = lowerBound + (BATCH_SIZE * INCREMENT);
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound += INCREMENT;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextDouble();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Double next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+        }
+        out.writeDouble(value);
+    }
+
+    @Override
+    public Double get() throws IOException {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
new file mode 100644
index 0000000..7c6556b
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt32FieldValueGenerator.java
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class AInt32FieldValueGenerator implements IAsterixFieldValueGenerator<Integer> {
+    private static final int START = 1000000000;
+    private static final int BATCH_SIZE = 1000;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private int value;
+    private int cycle;
+    private List<Integer> uniques;
+    private Iterator<Integer> iterator;
+
+    public AInt32FieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    int lowerBound = START;
+                    int upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        generate();
+        out.writeInt(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        int lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        int upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextInt();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Integer next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+        }
+        out.writeInt(value);
+    }
+
+    @Override
+    public Integer get() throws IOException {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
new file mode 100644
index 0000000..2a2496e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AInt64FieldValueGenerator.java
@@ -0,0 +1,152 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class AInt64FieldValueGenerator implements IAsterixFieldValueGenerator<Long> {
+    private static final long START = 4500000000000000000L;
+    private static final long BATCH_SIZE = 1000L;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private long value;
+    private int cycle;
+    private List<Long> uniques;
+    private Iterator<Long> iterator;
+
+    public AInt64FieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Long.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0L;
+                break;
+            case RANDOM:
+                if (unique) {
+                    long lowerBound = START;
+                    long upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        generate();
+        out.writeLong(value);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        long lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        long upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextLong();
+                }
+                break;
+            default:
+                break;
+
+        }
+    }
+
+    @Override
+    public Long next() throws IOException {
+        generate();
+        return value;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+        }
+        out.writeLong(value);
+    }
+
+    @Override
+    public Long get() throws IOException {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
new file mode 100644
index 0000000..df717e9
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/ARecordValueGenerator.java
@@ -0,0 +1,119 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class ARecordValueGenerator implements IAsterixFieldValueGenerator<ITupleReference> {
+    private final IAsterixFieldValueGenerator<?>[] generators;
+    private final boolean tagged;
+    private final ARecordType recordType;
+    private final RecordBuilder recBuilder;
+    private final ArrayBackedValueStorage fieldValueBuffer;
+    private final TestTupleReference tuple;
+
+    public ARecordValueGenerator(GenerationFunction[] generationFunctions, ARecordType recordType, boolean[] uniques,
+            boolean tagged) {
+        this.tagged = tagged;
+        this.recordType = recordType;
+        tuple = new TestTupleReference(1);
+        fieldValueBuffer = new ArrayBackedValueStorage();
+        recBuilder = new RecordBuilder();
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        generators = new IAsterixFieldValueGenerator<?>[recordType.getFieldTypes().length];
+        for (int i = 0; i < recordType.getFieldTypes().length; i++) {
+            ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
+            switch (tag) {
+                case BOOLEAN:
+                    generators[i] = new ABooleanFieldValueGenerator(generationFunctions[i], true);
+                    break;
+                case DOUBLE:
+                    generators[i] = new ADoubleFieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case INT32:
+                    generators[i] = new AInt32FieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case INT64:
+                    generators[i] = new AInt64FieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                case STRING:
+                    generators[i] = new AStringFieldValueGenerator(generationFunctions[i], uniques[i], true);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported type " + tag);
+            }
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        for (int i = 0; i < generators.length; i++) {
+            fieldValueBuffer.reset();
+            generators[i].next(fieldValueBuffer.getDataOutput());
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, tagged);
+    }
+
+    @Override
+    public ITupleReference next() throws IOException {
+        tuple.reset();
+        next(tuple.getFields()[0].getDataOutput());
+        return tuple;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        recBuilder.reset(recordType);
+        recBuilder.init();
+        for (int i = 0; i < generators.length; i++) {
+            fieldValueBuffer.reset();
+            generators[i].get(fieldValueBuffer.getDataOutput());
+            recBuilder.addField(i, fieldValueBuffer);
+        }
+        recBuilder.write(out, tagged);
+    }
+
+    @Override
+    public ITupleReference get() throws IOException {
+        tuple.reset();
+        get(tuple.getFields()[0].getDataOutput());
+        return tuple;
+    }
+
+    public void get(int i, DataOutput out) throws IOException {
+        generators[i].get(out);
+    }
+
+    public Object get(int i) throws IOException {
+        return generators[i].get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
new file mode 100644
index 0000000..5ee6d40
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/AStringFieldValueGenerator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class AStringFieldValueGenerator implements IAsterixFieldValueGenerator<String> {
+    private static final String PREFIX = "A String Value #";
+    private static final int START = 1000000000;
+    private static final int BATCH_SIZE = 1000;
+    private final GenerationFunction generationFunction;
+    private final boolean unique;
+    private final boolean tagged;
+    private final Random rand = new Random();
+    private int value;
+    private int cycle;
+    private List<Integer> uniques;
+    private Iterator<Integer> iterator;
+    private String aString;
+    private UTF8StringSerializerDeserializer stringSerde =
+            new UTF8StringSerializerDeserializer(new UTF8StringWriter(), new UTF8StringReader());
+
+    public AStringFieldValueGenerator(GenerationFunction generationFunction, boolean unique, boolean tagged) {
+        this.generationFunction = generationFunction;
+        this.unique = unique;
+        this.tagged = tagged;
+        switch (generationFunction) {
+            case DECREASING:
+                value = Integer.MAX_VALUE;
+                break;
+            case DETERMINISTIC:
+                value = START;
+                break;
+            case INCREASING:
+                value = 0;
+                break;
+            case RANDOM:
+                if (unique) {
+                    int lowerBound = START;
+                    int upperBound = lowerBound + BATCH_SIZE;
+                    uniques = new ArrayList<>();
+                    while (lowerBound < upperBound) {
+                        uniques.add(lowerBound);
+                        lowerBound++;
+                    }
+                    Collections.shuffle(uniques);
+                    iterator = uniques.iterator();
+                }
+            default:
+                break;
+        }
+    }
+
+    @Override
+    public void next(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        generate();
+        stringSerde.serialize(aString, out);
+    }
+
+    private void generate() {
+        switch (generationFunction) {
+            case DECREASING:
+                value--;
+            case DETERMINISTIC:
+                if (value >= START) {
+                    cycle++;
+                    value = START - cycle;
+                } else {
+                    value = START + cycle;
+                }
+                break;
+            case INCREASING:
+                value++;
+                break;
+            case RANDOM:
+                if (unique) {
+                    if (iterator.hasNext()) {
+                        value = iterator.next();
+                    } else {
+                        // generate next patch
+                        cycle++;
+                        int lowerBound;
+                        if (cycle % 2 == 0) {
+                            // even
+                            lowerBound = START + ((cycle / 2) * BATCH_SIZE);
+                        } else {
+                            // odd
+                            lowerBound = START - ((cycle / 2 + 1) * BATCH_SIZE);
+                        }
+                        int upperBound = lowerBound + BATCH_SIZE;
+                        uniques.clear();
+                        while (lowerBound < upperBound) {
+                            uniques.add(lowerBound);
+                            lowerBound++;
+                        }
+                        Collections.shuffle(uniques);
+                        iterator = uniques.iterator();
+                        value = iterator.next();
+                    }
+                } else {
+                    value = rand.nextInt();
+                }
+                break;
+            default:
+                break;
+        }
+        aString = PREFIX + String.format("%08d", value);
+    }
+
+    @Override
+    public String next() throws IOException {
+        generate();
+        return aString;
+    }
+
+    @Override
+    public void get(DataOutput out) throws IOException {
+        if (tagged) {
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+        }
+        stringSerde.serialize(aString, out);
+    }
+
+    @Override
+    public String get() throws IOException {
+        return aString;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
new file mode 100644
index 0000000..17bdcf8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/IAsterixFieldValueGenerator.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface IAsterixFieldValueGenerator<T> {
+    /**
+     * @param out
+     * @throws IOException
+     */
+    public void next(DataOutput out) throws IOException;
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public T next() throws IOException;
+
+    /**
+     * @param out
+     * @throws IOException
+     */
+    public void get(DataOutput out) throws IOException;
+
+    /**
+     * @return
+     * @throws IOException
+     */
+    public T get() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
new file mode 100644
index 0000000..ee3de51
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TestTupleCounterFrameWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.data.gen;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+
+public class TestTupleCounterFrameWriter extends TestFrameWriter {
+
+    private final FrameTupleAccessor accessor;
+    private int count = 0;
+
+    public TestTupleCounterFrameWriter(RecordDescriptor recordDescriptor, CountAnswer openAnswer,
+            CountAnswer nextAnswer, CountAnswer flushAnswer, CountAnswer failAnswer, CountAnswer closeAnswer,
+            boolean deepCopyInputFrames) {
+        super(openAnswer, nextAnswer, flushAnswer, failAnswer, closeAnswer, deepCopyInputFrames);
+        accessor = new FrameTupleAccessor(recordDescriptor);
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        super.nextFrame(buffer);
+        accessor.reset(buffer);
+        count += accessor.getTupleCount();
+    }
+
+    public int getCount() {
+        return count;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
new file mode 100644
index 0000000..98c57a0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/data/gen/TupleGenerator.java
@@ -0,0 +1,126 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.asterix.app.data.gen;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TupleGenerator {
+
+    private final int[] keyIndexes;
+    private final int[] keyIndicators;
+    private final ARecordValueGenerator recordGenerator;
+    private final ARecordValueGenerator metaGenerator;
+    private final TestTupleReference tuple;
+
+    public enum GenerationFunction {
+        RANDOM,
+        DETERMINISTIC,
+        INCREASING,
+        DECREASING
+    }
+
+    /**
+     * @param recordType
+     * @param metaType
+     * @param key
+     * @param keyIndexes
+     * @param keyIndicators
+     * @param recordGeneration
+     * @param uniqueRecordFields
+     * @param metaGeneration
+     * @param uniqueMetaFields
+     */
+    public TupleGenerator(ARecordType recordType, ARecordType metaType, int[] keyIndexes, int[] keyIndicators,
+            GenerationFunction[] recordGeneration, boolean[] uniqueRecordFields, GenerationFunction[] metaGeneration,
+            boolean[] uniqueMetaFields) {
+        this.keyIndexes = keyIndexes;
+        this.keyIndicators = keyIndicators;
+        for (IAType field : recordType.getFieldTypes()) {
+            validate(field);
+        }
+        recordGenerator = new ARecordValueGenerator(recordGeneration, recordType, uniqueRecordFields, true);
+        if (metaType != null) {
+            for (IAType field : metaType.getFieldTypes()) {
+                validate(field);
+            }
+            metaGenerator = new ARecordValueGenerator(metaGeneration, metaType, uniqueMetaFields, true);
+        } else {
+            metaGenerator = null;
+        }
+        int numOfFields = keyIndexes.length + 1 + ((metaType != null) ? 1 : 0);
+        tuple = new TestTupleReference(numOfFields);
+        boolean atLeastOneKeyFieldIsNotRandomAndNotBoolean = false;
+        for (int i = 0; i < keyIndexes.length; i++) {
+            if (keyIndicators[i] < 0 || keyIndicators[i] > 1) {
+                throw new IllegalArgumentException("key field indicator must be either 0 or 1");
+            }
+            atLeastOneKeyFieldIsNotRandomAndNotBoolean = atLeastOneKeyFieldIsNotRandomAndNotBoolean
+                    || validateKey(keyIndexes[i], keyIndicators[i] == 0 ? recordType : metaType,
+                            keyIndicators[i] == 0 ? uniqueRecordFields[i] : uniqueMetaFields[i]);
+        }
+        if (!atLeastOneKeyFieldIsNotRandomAndNotBoolean) {
+            throw new IllegalArgumentException("at least one key field must be unique and not boolean");
+        }
+        if (keyIndexes.length != keyIndicators.length) {
+            throw new IllegalArgumentException("number of key indexes must equals number of key indicators");
+        }
+    }
+
+    private boolean validateKey(int i, ARecordType type, boolean unique) {
+        if (type.getFieldNames().length <= i) {
+            throw new IllegalArgumentException("key index must be less than number of fields");
+        }
+        return unique && type.getFieldTypes()[i].getTypeTag() != ATypeTag.BOOLEAN;
+    }
+
+    public ITupleReference next() throws IOException {
+        tuple.reset();
+        recordGenerator.next(tuple.getFields()[keyIndexes.length].getDataOutput());
+        if (metaGenerator != null) {
+            recordGenerator.next(tuple.getFields()[keyIndexes.length + 1].getDataOutput());
+        }
+        for (int i = 0; i < keyIndexes.length; i++) {
+            if (keyIndicators[i] == 0) {
+                recordGenerator.get(keyIndexes[i], tuple.getFields()[i].getDataOutput());
+            } else {
+                metaGenerator.get(keyIndexes[i], tuple.getFields()[i].getDataOutput());
+            }
+        }
+        return tuple;
+    }
+
+    private void validate(IAType field) {
+        switch (field.getTypeTag()) {
+            case BOOLEAN:
+            case DOUBLE:
+            case INT32:
+            case INT64:
+            case STRING:
+                break;
+            default:
+                throw new IllegalArgumentException("Generating data of type " + field + " is not supported");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
index e267cc7..54bdb1b 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleGenerator.java
@@ -96,33 +96,4 @@ public class TestTupleGenerator {
         }
         return tuple;
     }
-
-    private class TestTupleReference implements ITupleReference {
-        private final GrowableArray[] fields;
-
-        private TestTupleReference(GrowableArray[] fields) {
-            this.fields = fields;
-        }
-
-        @Override
-        public int getFieldCount() {
-            return fields.length;
-        }
-
-        @Override
-        public byte[] getFieldData(int fIdx) {
-
-            return fields[fIdx].getByteArray();
-        }
-
-        @Override
-        public int getFieldStart(int fIdx) {
-            return 0;
-        }
-
-        @Override
-        public int getFieldLength(int fIdx) {
-            return fields[fIdx].getLength();
-        }
-    }
 }