You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/03/08 23:03:11 UTC

[accumulo-testing] branch master updated: Made CI bulk import configurable and scriptable (#64)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new daf1b63  Made CI bulk import configurable and scriptable (#64)
daf1b63 is described below

commit daf1b636567ef0f2c86232a76eaed623a2098d7b
Author: Keith Turner <kt...@apache.org>
AuthorDate: Fri Mar 8 18:03:07 2019 -0500

    Made CI bulk import configurable and scriptable (#64)
---
 README.md                                          |   1 +
 bin/cingest                                        |   6 +-
 conf/accumulo-testing.properties.example           |  11 ++
 docs/bulk-test.md                                  |  38 ++++
 .../org/apache/accumulo/testing/TestProps.java     |   5 +-
 .../accumulo/testing/continuous/BulkIngest.java    | 213 ++++-----------------
 .../accumulo/testing/continuous/ContinuousEnv.java |  12 ++
 .../testing/continuous/ContinuousIngest.java       |  11 +-
 .../testing/continuous/ContinuousInputFormat.java  | 195 +++++++++++++++++++
 9 files changed, 303 insertions(+), 189 deletions(-)

diff --git a/README.md b/README.md
index 463c52c..a2eea4c 100644
--- a/README.md
+++ b/README.md
@@ -131,6 +131,7 @@ the referenced but undefined node.  The MapReduce job produces two other counts:
 UNREFERENCED. It is expected that these two counts are non zero. REFERENCED counts nodes that are
 defined and referenced. UNREFERENCED counts nodes that defined and unreferenced, these are the
 latest nodes inserted.
+* `bulk` - Runs a MapReduce job that generates data for bulk import.  See [bulk-test.md](docs/bulk-test.md).
 * `moru` - Runs a MapReduce job that stresses Accumulo by reading and writing the continuous ingest
 table. This MapReduce job will write out an entry for every entry in the table (except for ones
 created by the MapReduce job itself). Stop ingest before running this MapReduce job. Do not run more
diff --git a/bin/cingest b/bin/cingest
index a004b3b..58ac51e 100755
--- a/bin/cingest
+++ b/bin/cingest
@@ -73,6 +73,10 @@ case "$1" in
     ci_main="${ci_package}.ContinuousMoru"
     ;;
   bulk)
+    if [ "$#" -ne 2 ]; then
+      echo "Usage : $0 $1 <bulk dir>"
+      exit 1
+    fi
     ci_main="${ci_package}.BulkIngest"
     ;;
   *)
@@ -84,7 +88,7 @@ esac
 export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
 
 case "$1" in
-  verify|moru)
+  verify|moru|bulk)
     if [ ! -z $HADOOP_HOME ]; then
       export HADOOP_USE_CLIENT_CLASSLOADER=true
       "$HADOOP_HOME"/bin/yarn jar "$TEST_JAR_PATH" "$ci_main" "${@:2}" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index e60e1a7..502bcde 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -108,3 +108,14 @@ test.ci.verify.scan.offline=false
 test.ci.verify.auths=
 # Location in HDFS to store output. Must not exist.
 test.ci.verify.output.dir=/tmp/ci-verify
+
+# Bulk Ingest
+# -----------
+# The number of map task to run.
+test.ci.bulk.map.task=10
+# The number of nodes to generate per map task.
+test.ci.bulk.map.nodes=1000000
+# The number of reducers will be the minimum of this prop and table splits+1.  Each reducer will
+# produce a bulk import file.
+test.ci.bulk.reducers.max=1024
+
diff --git a/docs/bulk-test.md b/docs/bulk-test.md
new file mode 100644
index 0000000..3216882
--- /dev/null
+++ b/docs/bulk-test.md
@@ -0,0 +1,38 @@
+# Running a bulk ingest test
+
+Continous ingest supports bulk ingest in addition to live ingest. A map reduce
+job that generates rfiles using the tables splits can be run.  This can be run
+in a loop like the following to continually bulk import data.
+
+```bash
+# create the ci table if necessary
+./bin/cingest createtable
+
+for i in $(seq 1 10); do
+   # run map reduce job to generate data for bulk import
+   ./bin/cingest bulk /tmp/bt/$i
+   # ask accumulo to import generated data
+   echo -e "table ci\nimportdirectory /tmp/bt/$i/files true" | accumulo shell -u root -p secret
+done
+./bin/cingest verify
+```
+
+Another way to use this in test is to generate a lot of data and then bulk import it all at once as follows.
+
+```bash
+for i in $(seq 1 10); do
+  ./bin/cingest bulk /tmp/bt/$i
+done
+
+(
+  echo "table ci"
+  for i in $(seq 1 10); do
+    echo "importdirectory /tmp/bt/$i/files true"
+  done
+) | accumulo shell -u root -p secret
+./bin/cingest verify
+```
+
+Bulk ingest could be run concurrently with live ingest into the same table.  It
+could also be run while the agitator is running.
+
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java
index 7d7808d..3f2ca15 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -118,8 +118,9 @@ public class TestProps {
   public static final String CI_VERIFY_OUTPUT_DIR = CI_VERIFY + "output.dir";
 
   /** Bulk **/
-  // Bulk ingest Job instance uuid
-  public static final String CI_BULK_UUID = CI_BULK + "uuid";
+  public static final String CI_BULK_MAP_TASK = CI_BULK + "map.task";
+  public static final String CI_BULK_MAP_NODES = CI_BULK + "map.nodes";
+  public static final String CI_BULK_REDUCERS = CI_BULK + "reducers.max";
 
   public static Properties loadFromFile(String propsFilePath) {
     try {
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
index 4b2e20b..bb3b3af 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java
@@ -1,19 +1,27 @@
-package org.apache.accumulo.testing.continuous;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import static org.apache.accumulo.testing.TestProps.CI_BULK_UUID;
-import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol;
-import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong;
+package org.apache.accumulo.testing.continuous;
 
 import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.io.PrintStream;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
-import java.util.List;
-import java.util.Random;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.AccumuloClient;
@@ -22,19 +30,10 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
 import org.apache.accumulo.hadoop.mapreduce.partition.KeyRangePartitioner;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
@@ -45,8 +44,6 @@ import org.slf4j.LoggerFactory;
  * by running ContinuousVerify.
  */
 public class BulkIngest extends Configured implements Tool {
-  public static final int NUM_KEYS = 1_000_000;
-  public static final String BULK_CI_DIR = "ci-bulk";
 
   public static final Logger log = LoggerFactory.getLogger(BulkIngest.class);
 
@@ -58,39 +55,41 @@ public class BulkIngest extends Configured implements Tool {
     job.getConfiguration().set("mapreduce.job.classloader", "true");
     FileSystem fs = FileSystem.get(job.getConfiguration());
 
-    final String JOB_DIR = BULK_CI_DIR + "/" + getCurrentJobNumber(fs);
-    final String RFILE_DIR = JOB_DIR + "/rfiles";
-    log.info("Creating new job at {}", JOB_DIR);
-
     String ingestInstanceId = UUID.randomUUID().toString();
-    job.getConfiguration().set(CI_BULK_UUID, ingestInstanceId);
 
     log.info(String.format("UUID %d %s", System.currentTimeMillis(), ingestInstanceId));
 
-    Path outputDir = new Path(RFILE_DIR);
-
-    job.setInputFormatClass(RandomInputFormat.class);
+    job.setInputFormatClass(ContinuousInputFormat.class);
 
     // map the generated random longs to key values
-    job.setMapperClass(RandomMapper.class);
     job.setMapOutputKeyClass(Key.class);
     job.setMapOutputValueClass(Value.class);
 
-    // output RFiles for the import
-    job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-    AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
+    String bulkDir = args[0];
+
+    // remove bulk dir from args
+    args = Arrays.asList(args).subList(1, 3).toArray(new String[2]);
 
     try (ContinuousEnv env = new ContinuousEnv(args)) {
+      fs.mkdirs(new Path(bulkDir));
+
+      // output RFiles for the import
+      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+      AccumuloFileOutputFormat.configure().outputPath(new Path(bulkDir + "/files")).store(job);
+
+      ContinuousInputFormat.configure(job.getConfiguration(), ingestInstanceId, env);
+
       String tableName = env.getAccumuloTableName();
 
       // create splits file for KeyRangePartitioner
-      String splitsFile = JOB_DIR + "/splits.txt";
+      String splitsFile = bulkDir + "/splits.txt";
       try (AccumuloClient client = env.getAccumuloClient()) {
 
         // make sure splits file is closed before continuing
         try (PrintStream out = new PrintStream(
             new BufferedOutputStream(fs.create(new Path(splitsFile))))) {
-          Collection<Text> splits = client.tableOperations().listSplits(tableName, 100);
+          Collection<Text> splits = client.tableOperations().listSplits(tableName,
+              env.getBulkReducers() - 1);
           for (Text split : splits) {
             out.println(Base64.getEncoder().encodeToString(split.copyBytes()));
           }
@@ -103,157 +102,13 @@ public class BulkIngest extends Configured implements Tool {
         job.waitForCompletion(true);
         boolean success = job.isSuccessful();
 
-        // bulk import completed files
-        if (success) {
-          log.info("Sort and create job successful. Bulk importing {} to {}", RFILE_DIR, tableName);
-          client.tableOperations().importDirectory(RFILE_DIR).to(tableName).load();
-        } else {
-          log.error("Job failed, not calling bulk import");
-        }
         return success ? 0 : 1;
       }
     }
   }
 
-  private int getCurrentJobNumber(FileSystem fs) throws Exception {
-    Path jobPath = new Path(BULK_CI_DIR);
-    FileStatus jobDir = fs.getFileStatus(jobPath);
-    if (jobDir.isDirectory()) {
-      FileStatus[] jobs = fs.listStatus(jobPath);
-      return jobs.length;
-    } else {
-      log.info("{} directory doesn't exist yet, first job running will create it.", BULK_CI_DIR);
-      return 0;
-    }
-  }
-
   public static void main(String[] args) throws Exception {
     int ret = ToolRunner.run(new BulkIngest(), args);
     System.exit(ret);
   }
-
-  /**
-   * Mapper that takes the longs from RandomInputFormat and output Key Value pairs
-   */
-  public static class RandomMapper extends Mapper<LongWritable,LongWritable,Key,Value> {
-
-    private String uuid;
-    private Text currentRow;
-    private Text currentValue;
-    private Text emptyCfCq;
-
-    @Override
-    protected void setup(Context context) {
-      uuid = context.getConfiguration().get(CI_BULK_UUID);
-      currentRow = new Text();
-      currentValue = new Text();
-      emptyCfCq = new Text(genCol(0));
-    }
-
-    @Override
-    protected void map(LongWritable key, LongWritable value, Context context)
-        throws IOException, InterruptedException {
-      currentRow.set(ContinuousIngest.genRow(key.get()));
-
-      // hack since we can't pass null - don't set first val (prevRow), we want it to be null
-      long longVal = value.get();
-      if (longVal != 1L) {
-        currentValue.set(ContinuousIngest.genRow(longVal));
-      }
-
-      Key outputKey = new Key(currentRow, emptyCfCq, emptyCfCq);
-      Value outputValue = ContinuousIngest.createValue(uuid.getBytes(), 0, currentValue.copyBytes(),
-          null);
-
-      context.write(outputKey, outputValue);
-    }
-  }
-
-  /**
-   * Generates a million LongWritable keys. The LongWritable value points to the previous key. The
-   * first key value pair has a value of 1L. This is translated to null in RandomMapper
-   */
-  public static class RandomInputFormat extends InputFormat {
-
-    public static class RandomSplit extends InputSplit implements Writable {
-      @Override
-      public void write(DataOutput dataOutput) {}
-
-      @Override
-      public void readFields(DataInput dataInput) {}
-
-      @Override
-      public long getLength() {
-        return 0;
-      }
-
-      @Override
-      public String[] getLocations() {
-        return new String[0];
-      }
-    }
-
-    @Override
-    public List<InputSplit> getSplits(JobContext jobContext) {
-      List<InputSplit> splits = new ArrayList<>();
-      for (int i = 0; i < 10; i++) {
-        splits.add(new RandomSplit());
-      }
-      return splits;
-    }
-
-    @Override
-    public RecordReader createRecordReader(InputSplit inputSplit,
-        TaskAttemptContext taskAttemptContext) {
-      return new RecordReader() {
-        int number;
-        int currentNumber;
-        LongWritable currentKey;
-        LongWritable prevRow;
-        private Random random;
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
-          // number = Integer.parseInt(job.getConfiguration().get(NUM_KEYS));
-          number = NUM_KEYS;
-          currentKey = new LongWritable(1);
-          prevRow = new LongWritable(1);
-          random = new Random();
-          currentNumber = 0;
-        }
-
-        @Override
-        public boolean nextKeyValue() {
-          if (currentNumber < number) {
-            prevRow.set(currentKey.get());
-            currentKey.set(genLong(0, Long.MAX_VALUE, random));
-            currentNumber++;
-            return true;
-          } else {
-            return false;
-          }
-        }
-
-        @Override
-        public LongWritable getCurrentKey() {
-          return currentKey;
-        }
-
-        @Override
-        public LongWritable getCurrentValue() {
-          return prevRow;
-        }
-
-        @Override
-        public float getProgress() {
-          return currentNumber * 1.0f / number;
-        }
-
-        @Override
-        public void close() throws IOException {
-
-        }
-      };
-    }
-  }
 }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
index 51ae3d5..0912475 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousEnv.java
@@ -59,6 +59,18 @@ public class ContinuousEnv extends TestEnv {
     return Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_MAX_CQ));
   }
 
+  public int getBulkMapTask() {
+    return Integer.parseInt(testProps.getProperty(TestProps.CI_BULK_MAP_TASK));
+  }
+
+  public long getBulkMapNodes() {
+    return Long.parseLong(testProps.getProperty(TestProps.CI_BULK_MAP_NODES));
+  }
+
+  public int getBulkReducers() {
+    return Integer.parseInt(testProps.getProperty(TestProps.CI_BULK_REDUCERS));
+  }
+
   public String getAccumuloTableName() {
     return testProps.getProperty(TestProps.CI_COMMON_ACCUMULO_TABLE);
   }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 0499b09..85ce6c3 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -33,13 +33,11 @@ import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.TraceSamplers;
 import org.apache.accumulo.core.util.FastFormat;
 import org.apache.accumulo.testing.TestProps;
-import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -260,10 +258,9 @@ public class ContinuousIngest {
       cksum.update(cv.getExpression());
     }
 
-    Mutation m = new Mutation(new Text(rowString));
+    Mutation m = new Mutation(rowString);
 
-    m.put(new Text(cfString), new Text(cqString), cv,
-        createValue(ingestInstanceId, count, prevRow, cksum));
+    m.put(cfString, cqString, cv, createValue(ingestInstanceId, count, prevRow, cksum));
     return m;
   }
 
@@ -283,7 +280,7 @@ public class ContinuousIngest {
     return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
   }
 
-  public static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow,
+  public static byte[] createValue(byte[] ingestInstanceId, long count, byte[] prevRow,
       Checksum cksum) {
     int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
     if (cksum != null)
@@ -312,6 +309,6 @@ public class ContinuousIngest {
 
     // System.out.println("val "+new String(val));
 
-    return new Value(val);
+    return val;
   }
 }
diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
new file mode 100644
index 0000000..218e1c5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousInputFormat.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.continuous;
+
+import static org.apache.accumulo.testing.continuous.ContinuousIngest.genCol;
+import static org.apache.accumulo.testing.continuous.ContinuousIngest.genLong;
+import static org.apache.accumulo.testing.continuous.ContinuousIngest.genRow;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.testing.TestProps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Generates a continuous ingest linked list per map reduce split. Each linked list is of
+ * configurable length.
+ */
+public class ContinuousInputFormat extends InputFormat<Key,Value> {
+
+  private static final String PROP_UUID = "mrbulk.uuid";
+  private static final String PROP_MAP_TASK = "mrbulk.map.task";
+  private static final String PROP_MAP_NODES = "mrbulk.map.nodes";
+  private static final String PROP_ROW_MIN = "mrbulk.row.min";
+  private static final String PROP_ROW_MAX = "mrbulk.row.max";
+  private static final String PROP_FAM_MAX = "mrbulk.fam.max";
+  private static final String PROP_QUAL_MAX = "mrbulk.qual.max";
+  private static final String PROP_CHECKSUM = "mrbulk.checksum";
+
+  private static class RandomSplit extends InputSplit implements Writable {
+    @Override
+    public void write(DataOutput dataOutput) {}
+
+    @Override
+    public void readFields(DataInput dataInput) {}
+
+    @Override
+    public long getLength() {
+      return 0;
+    }
+
+    @Override
+    public String[] getLocations() {
+      return new String[0];
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) {
+    int numTask = jobContext.getConfiguration().getInt(PROP_MAP_TASK, 1);
+    List<InputSplit> splits = new ArrayList<>();
+    for (int i = 0; i < numTask; i++) {
+      splits.add(new RandomSplit());
+    }
+    return splits;
+  }
+
+  public static void configure(Configuration conf, String uuid, ContinuousEnv env) {
+    conf.set(PROP_UUID, uuid);
+    conf.setInt(PROP_MAP_TASK, env.getBulkMapTask());
+    conf.setLong(PROP_MAP_NODES, env.getBulkMapNodes());
+    conf.setLong(PROP_ROW_MIN, env.getRowMin());
+    conf.setLong(PROP_ROW_MAX, env.getRowMax());
+    conf.setInt(PROP_FAM_MAX, env.getMaxColF());
+    conf.setInt(PROP_QUAL_MAX, env.getMaxColQ());
+    conf.setBoolean(PROP_CHECKSUM,
+        Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM)));
+  }
+
+  @Override
+  public RecordReader<Key,Value> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext taskAttemptContext) {
+    return new RecordReader<Key,Value>() {
+      long numNodes;
+      long nodeCount;
+      private Random random;
+
+      private byte[] uuid;
+
+      long minRow;
+      long maxRow;
+      int maxFam;
+      int maxQual;
+      boolean checksum;
+
+      Key prevKey;
+      Key currKey;
+      Value currValue;
+
+      @Override
+      public void initialize(InputSplit inputSplit, TaskAttemptContext job) {
+        numNodes = job.getConfiguration().getLong(PROP_MAP_NODES, 1000000);
+        uuid = job.getConfiguration().get(PROP_UUID).getBytes(StandardCharsets.UTF_8);
+
+        minRow = job.getConfiguration().getLong(PROP_ROW_MIN, 0);
+        maxRow = job.getConfiguration().getLong(PROP_ROW_MAX, Long.MAX_VALUE);
+        maxFam = job.getConfiguration().getInt(PROP_FAM_MAX, Short.MAX_VALUE);
+        maxQual = job.getConfiguration().getInt(PROP_QUAL_MAX, Short.MAX_VALUE);
+        checksum = job.getConfiguration().getBoolean(PROP_CHECKSUM, false);
+
+        random = new Random();
+        nodeCount = 0;
+      }
+
+      private Key genKey(CRC32 cksum) {
+
+        byte[] row = genRow(genLong(minRow, maxRow, random));
+
+        byte[] fam = genCol(random.nextInt(maxFam));
+        byte[] qual = genCol(random.nextInt(maxQual));
+
+        if (cksum != null) {
+          cksum.update(row);
+          cksum.update(fam);
+          cksum.update(qual);
+          cksum.update(new byte[0]); // TODO col vis
+        }
+
+        return new Key(row, fam, qual);
+      }
+
+      private byte[] createValue(byte[] ingestInstanceId, byte[] prevRow, Checksum cksum) {
+        return ContinuousIngest.createValue(ingestInstanceId, nodeCount, prevRow, cksum);
+      }
+
+      @Override
+      public boolean nextKeyValue() {
+
+        if (nodeCount < numNodes) {
+          CRC32 cksum = checksum ? new CRC32() : null;
+          byte[] prevRow = prevKey != null ? prevKey.getRowData().toArray() : null;
+
+          prevKey = currKey;
+          currKey = genKey(cksum);
+          currValue = new Value(createValue(uuid, prevRow, cksum));
+
+          nodeCount++;
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public Key getCurrentKey() {
+        return currKey;
+      }
+
+      @Override
+      public Value getCurrentValue() {
+        return currValue;
+      }
+
+      @Override
+      public float getProgress() {
+        return nodeCount * 1.0f / numNodes;
+      }
+
+      @Override
+      public void close() throws IOException {
+
+      }
+    };
+  }
+}