You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/06 20:17:31 UTC

[03/17] incubator-metron git commit: METRON-678: Multithread the flat file loader closes apache/incubator-metron#428

METRON-678: Multithread the flat file loader closes apache/incubator-metron#428


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ad8724ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ad8724ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ad8724ee

Branch: refs/heads/Metron_0.3.1
Commit: ad8724eed0af784fcd6a822a11842d86aefc8832
Parents: cc29dca
Author: cstella <ce...@gmail.com>
Authored: Tue Jan 31 15:19:55 2017 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Jan 31 15:19:55 2017 -0500

----------------------------------------------------------------------
 .../common/utils/file/ReaderSpliterator.java    | 232 +++++++++++++++++++
 .../utils/file/ReaderSpliteratorTest.java       | 185 +++++++++++++++
 .../metron-data-management/README.md            |  21 +-
 .../nonbulk/flatfile/ExtractorState.java        |  46 ++++
 .../SimpleEnrichmentFlatFileLoader.java         | 116 ++++++++--
 .../SimpleEnrichmentFlatFileLoaderTest.java     | 180 +++++++-------
 .../ElasticsearchIndexingIntegrationTest.java   |   1 +
 .../integration/IndexingIntegrationTest.java    |   2 +
 8 files changed, 669 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java
new file mode 100644
index 0000000..20a40fa
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.file;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Spliterators.spliterator;
+
+/**
+ * A Spliterator which works well on sequential streams by constructing a
+ * fixed batch size split rather than inheriting the spliterator from BufferedReader.lines()
+ * which gives up and reports no size and has no strategy for batching.  This is a bug
+ * in Java 8 and will be fixed in Java 9.
+ *
+ * The ideas have been informed by https://www.airpair.com/java/posts/parallel-processing-of-io-based-data-with-java-streams
+ * except more specific to strings and motivated by a JDK 8 bug as
+ * described at http://bytefish.de/blog/jdk8_files_lines_parallel_stream/
+ */
+public class ReaderSpliterator implements Spliterator<String> {
+  private static int characteristics = NONNULL | ORDERED | IMMUTABLE;
+  private int batchSize ;
+  private BufferedReader reader;
+  public ReaderSpliterator(BufferedReader reader) {
+    this(reader, 128);
+  }
+
+  public ReaderSpliterator(BufferedReader reader, int batchSize) {
+    this.batchSize = batchSize;
+    this.reader = reader;
+  }
+
+  @Override
+  public void forEachRemaining(Consumer<? super String> action) {
+    if (action == null) {
+      throw new NullPointerException();
+    }
+    try {
+      for (String line = null; (line = reader.readLine()) != null;) {
+        action.accept(line);
+      }
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+  /**
+   * If a remaining element exists, performs the given action on it,
+   * returning {@code true}; else returns {@code false}.  If this
+   * Spliterator is {@link #ORDERED} the action is performed on the
+   * next element in encounter order.  Exceptions thrown by the
+   * action are relayed to the caller.
+   *
+   * @param action The action
+   * @return {@code false} if no remaining elements existed
+   * upon entry to this method, else {@code true}.
+   * @throws NullPointerException if the specified action is null
+   */
+  @Override
+  public boolean tryAdvance(Consumer<? super String> action) {
+    if (action == null) {
+      throw new NullPointerException();
+    }
+    try {
+      final String line = reader.readLine();
+      if (line == null) {
+        return false;
+      }
+      action.accept(line);
+      return true;
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * If this spliterator can be partitioned, returns a Spliterator
+   * covering elements, that will, upon return from this method, not
+   * be covered by this Spliterator.
+   * <p>
+   * <p>If this Spliterator is {@link #ORDERED}, the returned Spliterator
+   * must cover a strict prefix of the elements.
+   * <p>
+   * <p>Unless this Spliterator covers an infinite number of elements,
+   * repeated calls to {@code trySplit()} must eventually return {@code null}.
+   * Upon non-null return:
+   * <ul>
+   * <li>the value reported for {@code estimateSize()} before splitting,
+   * must, after splitting, be greater than or equal to {@code estimateSize()}
+   * for this and the returned Spliterator; and</li>
+   * <li>if this Spliterator is {@code SUBSIZED}, then {@code estimateSize()}
+   * for this spliterator before splitting must be equal to the sum of
+   * {@code estimateSize()} for this and the returned Spliterator after
+   * splitting.</li>
+   * </ul>
+   * <p>
+   * <p>This method may return {@code null} for any reason,
+   * including emptiness, inability to split after traversal has
+   * commenced, data structure constraints, and efficiency
+   * considerations.
+   *
+   * @return a {@code Spliterator} covering some portion of the
+   * elements, or {@code null} if this spliterator cannot be split
+   * @apiNote An ideal {@code trySplit} method efficiently (without
+   * traversal) divides its elements exactly in half, allowing
+   * balanced parallel computation.  Many departures from this ideal
+   * remain highly effective; for example, only approximately
+   * splitting an approximately balanced tree, or for a tree in
+   * which leaf nodes may contain either one or two elements,
+   * failing to further split these nodes.  However, large
+   * deviations in balance and/or overly inefficient {@code
+   * trySplit} mechanics typically result in poor parallel
+   * performance.
+   */
+  @Override
+  public Spliterator<String> trySplit() {
+    final ConsumerWithLookback holder = new ConsumerWithLookback();
+    if (!tryAdvance(holder)) {
+      return null;
+    }
+    final String[] batch = new String[batchSize];
+    int j = 0;
+    do {
+      batch[j] = holder.value;
+    }
+    while (++j < batchSize && tryAdvance(holder));
+    return spliterator(batch, 0, j, characteristics() | SIZED);
+  }
+
+  /**
+   * Returns an estimate of the number of elements that would be
+   * encountered by a {@link #forEachRemaining} traversal, or returns {@link
+   * Long#MAX_VALUE} if infinite, unknown, or too expensive to compute.
+   * <p>
+   * <p>If this Spliterator is {@link #SIZED} and has not yet been partially
+   * traversed or split, or this Spliterator is {@link #SUBSIZED} and has
+   * not yet been partially traversed, this estimate must be an accurate
+   * count of elements that would be encountered by a complete traversal.
+   * Otherwise, this estimate may be arbitrarily inaccurate, but must decrease
+   * as specified across invocations of {@link #trySplit}.
+   *
+   * @return the estimated size, or {@code Long.MAX_VALUE} if infinite,
+   * unknown, or too expensive to compute.
+   * @apiNote Even an inexact estimate is often useful and inexpensive to compute.
+   * For example, a sub-spliterator of an approximately balanced binary tree
+   * may return a value that estimates the number of elements to be half of
+   * that of its parent; if the root Spliterator does not maintain an
+   * accurate count, it could estimate size to be the power of two
+   * corresponding to its maximum depth.
+   */
+  @Override
+  public long estimateSize() {
+    return Long.MAX_VALUE;
+  }
+
+  /**
+   * Returns a set of characteristics of this Spliterator and its
+   * elements. The result is represented as ORed values from {@link
+   * #ORDERED}, {@link #DISTINCT}, {@link #SORTED}, {@link #SIZED},
+   * {@link #NONNULL}, {@link #IMMUTABLE}, {@link #CONCURRENT},
+   * {@link #SUBSIZED}.  Repeated calls to {@code characteristics()} on
+   * a given spliterator, prior to or in-between calls to {@code trySplit},
+   * should always return the same result.
+   * <p>
+   * <p>If a Spliterator reports an inconsistent set of
+   * characteristics (either those returned from a single invocation
+   * or across multiple invocations), no guarantees can be made
+   * about any computation using this Spliterator.
+   *
+   * @return a representation of characteristics
+   * @apiNote The characteristics of a given spliterator before splitting
+   * may differ from the characteristics after splitting.  For specific
+   * examples see the characteristic values {@link #SIZED}, {@link #SUBSIZED}
+   * and {@link #CONCURRENT}.
+   */
+  @Override
+  public int characteristics() {
+    return characteristics;
+  }
+
+  static class ConsumerWithLookback implements Consumer<String> {
+    String value;
+    /**
+     * Performs this operation on the given argument.
+     *
+     * @param string the input argument
+     */
+    @Override
+    public void accept(String string) {
+      this.value = string;
+    }
+  }
+
+  public static Stream<String> lineStream(BufferedReader in, int batchSize) {
+    return lineStream(in, batchSize, false);
+  }
+
+  public static Stream<String> lineStream(BufferedReader in, int batchSize, boolean isParallel) {
+    return StreamSupport.stream(new ReaderSpliterator(in, batchSize), isParallel)
+                        .onClose(() -> {
+                          try {
+                            in.close();
+                          } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                          }
+                                       }
+                                );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
new file mode 100644
index 0000000..965840f
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.file;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ReaderSpliteratorTest {
+  /**
+   foo
+   bar
+   grok
+   foo
+   the
+   and
+   grok
+   foo
+   bar
+   */
+  @Multiline
+  public static String data;
+  public static final File dataFile = new File("target/readerspliteratortest.data");
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    if(dataFile.exists()) {
+      dataFile.delete();
+    }
+    Files.write(dataFile.toPath(), data.getBytes(), StandardOpenOption.CREATE_NEW, StandardOpenOption.TRUNCATE_EXISTING);
+    dataFile.deleteOnExit();
+  }
+
+  public static BufferedReader getReader() throws FileNotFoundException {
+    return new BufferedReader(new FileReader(dataFile));
+  }
+
+  @Test
+  public void testParallelStreamSmallBatch() throws FileNotFoundException {
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
+
+      Map<String, Integer> count =
+              stream.parallel().map( s -> s.trim())
+                      .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+      Assert.assertEquals(5, count.size());
+      Assert.assertEquals(3, (int)count.get("foo"));
+      Assert.assertEquals(2, (int)count.get("bar"));
+      Assert.assertEquals(1, (int)count.get("and"));
+      Assert.assertEquals(1, (int)count.get("the"));
+    }
+  }
+
+  @Test
+  public void testParallelStreamLargeBatch() throws FileNotFoundException {
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 100)) {
+      Map<String, Integer> count =
+              stream.parallel().map(s -> s.trim())
+                      .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+      Assert.assertEquals(5, count.size());
+      Assert.assertEquals(3, (int) count.get("foo"));
+      Assert.assertEquals(2, (int) count.get("bar"));
+      Assert.assertEquals(1, (int) count.get("and"));
+      Assert.assertEquals(1, (int) count.get("the"));
+    }
+  }
+
+  @Test
+  public void testSequentialStreamLargeBatch() throws FileNotFoundException {
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 100)) {
+      Map<String, Integer> count =
+              stream.map(s -> s.trim())
+                      .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+      Assert.assertEquals(5, count.size());
+      Assert.assertEquals(3, (int) count.get("foo"));
+      Assert.assertEquals(2, (int) count.get("bar"));
+      Assert.assertEquals(1, (int) count.get("and"));
+      Assert.assertEquals(1, (int) count.get("the"));
+    }
+  }
+
+  @Test
+  public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
+    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
+      ForkJoinPool forkJoinPool = new ForkJoinPool(2);
+      forkJoinPool.submit(() -> {
+                Map<String, Integer> threads =
+                        stream.parallel().map(s -> Thread.currentThread().getName())
+                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+                Assert.assertTrue(threads.size() <= 2);
+              }
+      ).get();
+    }
+  }
+
+  @Test
+  public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
+    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
+      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
+      forkJoinPool.submit(() -> {
+                Map<String, Integer> threads =
+                        stream.parallel().map(s -> Thread.currentThread().getName())
+                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+                Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
+              }
+      ).get();
+    }
+  }
+
+  @Test
+  public void testActuallyParallel_mediumBatchNotImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
+    //Since this is not parallel and we're not making the stream itself parallel, we should only use one thread from the thread pool.
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, false)) {
+      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
+      forkJoinPool.submit(() -> {
+                Map<String, Integer> threads =
+                        stream.map(s -> Thread.currentThread().getName())
+                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+                Assert.assertTrue(threads.size() == 1);
+              }
+      ).get();
+    }
+  }
+
+  @Test
+  public void testActuallyParallel_mediumBatchImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
+    //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
+    //despite not calling .parallel() on the stream, we are constructing the stream to be implicitly parallel
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, true)) {
+      ForkJoinPool forkJoinPool = new ForkJoinPool(10);
+      forkJoinPool.submit(() -> {
+                Map<String, Integer> threads =
+                        stream.map(s -> Thread.currentThread().getName())
+                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+                Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
+              }
+      ).get();
+    }
+  }
+
+  @Test
+  public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
+    //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used
+    //despite the thread pool size of 2.
+    try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 10)) {
+      ForkJoinPool forkJoinPool = new ForkJoinPool(2);
+      forkJoinPool.submit(() -> {
+                Map<String, Integer> threads =
+                        stream.parallel().map(s -> Thread.currentThread().getName())
+                                .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+                Assert.assertEquals(1, threads.size());
+              }
+      ).get();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index a0c0164..26dd472 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -240,16 +240,17 @@ each document to be considered as input to the Extractor.
 
 The parameters for the utility are as follows:
 
-| Short Code | Long Code           | Is Required? | Description                                                                                                                                                                          |
-|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| -h         |                     | No           | Generate the help screen/set of options                                                                                                                                              |
-| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                    |
-| -t         | --hbase_table       | Yes          | The HBase table to import into                                                                                                                                                       |
-| -c         | --hbase_cf          | Yes          | The HBase table column family to import into                                                                                                                                         |
-| -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory. |
-| -l         | --log4j             | No           | The log4j properties file to load                                                                                                                                                    |
-| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                                                    |
-
+| Short Code | Long Code           | Is Required? | Description                                                                                                                                                                         |   |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
+| -h         |                     | No           | Generate the help screen/set of options                                                                                                                                             |   |
+| -e         | --extractor_config  | Yes          | JSON Document describing the extractor for this input data source                                                                                                                   |   |
+| -t         | --hbase_table       | Yes          | The HBase table to import into                                                                                                                                                      |   |
+| -c         | --hbase_cf          | Yes          | The HBase table column family to import into                                                                                                                                        |   |
+| -i         | --input             | Yes          | The input data location on local disk.  If this is a file, then that file will be loaded.  If this is a directory, then the files will be loaded recursively under that directory. |   |
+| -l         | --log4j             | No           | The log4j properties file to load                                                                                                                                                   |   |
+| -n         | --enrichment_config | No           | The JSON document describing the enrichments to configure.  Unlike other loaders, this is run first if specified.                                                                   |   |
+| -p         | --threads           | No           | The number of threads to use when extracting data.  The default is the number of cores.                                                                                             |   |
+| -b         | --batchSize         | No           | The batch size to use for HBase puts                                                                                                                                                |   |
 ### GeoLite2 Loader
 
 The shell script `$METRON_HOME/bin/geo_enrichment_load.sh` will retrieve MaxMind GeoLite2 data and load data into HDFS, and update the configuration.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
new file mode 100644
index 0000000..e44eb27
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+
+public class ExtractorState {
+  private HTableInterface table;
+  private Extractor extractor;
+  private HbaseConverter converter;
+
+  public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter) {
+    this.table = table;
+    this.extractor = extractor;
+    this.converter = converter;
+  }
+
+  public HTableInterface getTable() {
+    return table;
+  }
+
+  public Extractor getExtractor() {
+    return extractor;
+  }
+
+  public HbaseConverter getConverter() {
+    return converter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
index 0c7501a..9992422 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -20,6 +20,7 @@ package org.apache.metron.dataloads.nonbulk.flatfile;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import org.apache.commons.cli.*;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.file.ReaderSpliterator;
 import org.apache.metron.dataloads.extractor.Extractor;
 import org.apache.metron.dataloads.extractor.ExtractorHandler;
 import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
@@ -39,13 +42,13 @@ import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.common.utils.JSONUtils;
 
 import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
+import java.io.*;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Stream;
 
 public class SimpleEnrichmentFlatFileLoader {
   private static abstract class OptionHandler implements Function<String, Option> {}
@@ -111,6 +114,26 @@ public class SimpleEnrichmentFlatFileLoader {
         return o;
       }
     })
+    ,NUM_THREADS("p", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "threads", true, "The number of threads to use when extracting data.  The default is the number of cores of your machine.");
+        o.setArgName("NUM_THREADS");
+        o.setRequired(false);
+        return o;
+      }
+    })
+    ,BATCH_SIZE("b", new OptionHandler() {
+      @Nullable
+      @Override
+      public Option apply(@Nullable String s) {
+        Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
+        o.setArgName("SIZE");
+        o.setRequired(false);
+        return o;
+      }
+    })
     ,INPUT("i", new OptionHandler() {
       @Nullable
       @Override
@@ -207,25 +230,55 @@ public class SimpleEnrichmentFlatFileLoader {
     return ret;
   }
 
-
-  public void loadFile( File inputFile
-                      , Extractor extractor
-                      , HTableInterface table
-                      , String cf
-                      , HbaseConverter converter
-                      , boolean lineByLine
-                      ) throws IOException
+  public void load( final Iterable<Stream<String>> streams
+                  , final ThreadLocal<ExtractorState> state
+                  , final String cf
+                  , int numThreads
+                  )
   {
+    for(Stream<String> stream : streams) {
+      try {
+        ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
+        forkJoinPool.submit(() ->
+          stream.parallel().forEach(input -> {
+            ExtractorState es = state.get();
+            try {
+              es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter()));
+            } catch (IOException e) {
+              throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
+            }
+            }
+                                   )
+        ).get();
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(e.getMessage(), e);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException(e.getMessage(), e);
+      } finally {
+        stream.close();
+      }
+    }
+  }
+
+  private static Iterable<Stream<String>> streamify(List<File> files, int batchSize, boolean lineByLine) throws FileNotFoundException {
+    List<Stream<String>> ret = new ArrayList<>();
     if(!lineByLine) {
-      table.put(extract(FileUtils.readFileToString(inputFile), extractor, cf, converter));
+      ret.add(files.stream().map(f -> {
+        try {
+          return FileUtils.readFileToString(f);
+        } catch (IOException e) {
+          throw new IllegalStateException("File " + f.getName() + " not found.");
+        }
+      }));
     }
     else {
-      BufferedReader br = new BufferedReader(new FileReader(inputFile));
-      for(String line = null;(line = br.readLine()) != null;) {
-        table.put(extract(line, extractor, cf, converter));
+      for(File f : files) {
+        ret.add(ReaderSpliterator.lineStream(new BufferedReader(new FileReader(f)), batchSize));
       }
     }
+    return ret;
   }
+
   public static void main(String... argv) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
@@ -237,23 +290,40 @@ public class SimpleEnrichmentFlatFileLoader {
     ExtractorHandler handler = ExtractorHandler.load(
             FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
     );
+    int batchSize = 128;
+    if(LoadOptions.BATCH_SIZE.has(cli)) {
+      batchSize = ConversionUtils.convert(LoadOptions.BATCH_SIZE.get(cli), Integer.class);
+    }
+    int numThreads = Runtime.getRuntime().availableProcessors();
+    if(LoadOptions.NUM_THREADS.has(cli)) {
+      numThreads = ConversionUtils.convert(LoadOptions.NUM_THREADS.get(cli), Integer.class);
+    }
     boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class);
-    Extractor e = handler.getExtractor();
     SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null;
     if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) {
       sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli))
               , SensorEnrichmentUpdateConfig.class
       );
     }
-    HbaseConverter converter = new EnrichmentConverter();
     List<File> inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli)));
     SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
-    HTableInterface table = loader.getProvider()
-            .getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
+    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+      @Override
+      protected ExtractorState initialValue() {
+        try {
+          ExtractorHandler handler = ExtractorHandler.load(
+            FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
+          );
+          HTableInterface table = loader.getProvider().getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
+          return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter());
+        } catch (IOException e1) {
+          throw new IllegalStateException("Unable to get table: " + e1);
+        }
+      }
+    };
+
+    loader.load(streamify(inputFiles, batchSize, lineByLine), state, LoadOptions.HBASE_CF.get(cli), numThreads);
 
-    for (File f : inputFiles) {
-      loader.loadFile(f, e, table, LoadOptions.HBASE_CF.get(cli), converter, lineByLine);
-    }
     if(sensorEnrichmentUpdateConfig != null) {
       sensorEnrichmentUpdateConfig.updateSensorConfigs();
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
index b4891aa..4ffb91a 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.dataloads.nonbulk.flatfile;
 
+import com.google.common.collect.ImmutableList;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.PosixParser;
@@ -56,91 +57,108 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 public class SimpleEnrichmentFlatFileLoaderTest {
 
-    private HBaseTestingUtility testUtil;
-
-    /** The test table. */
-    private HTable testTable;
-    private String tableName = "enrichment";
-    private String cf = "cf";
-    private String csvFile="input.csv";
-    private String extractorJson = "extractor.json";
-    private String enrichmentJson = "enrichment_config.json";
-    private String log4jProperty = "log4j";
-
-    Configuration config = null;
-    /**
-     {
-        "config" : {
-            "columns" : {
-                "host" : 0,
-                "meta" : 2
-            },
-            "indicator_column" : "host",
-            "separator" : ",",
-            "type" : "enrichment"
-        },
-        "extractor" : "CSV"
-     }
-     */
-    @Multiline
-    private static String extractorConfig;
-
-    @Before
-    public void setup() throws Exception {
-       Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
-        config = kv.getValue();
-        testUtil = kv.getKey();
-        testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+  private HBaseTestingUtility testUtil;
+
+  /** The test table. */
+  private HTable testTable;
+  private String tableName = "enrichment";
+  private String cf = "cf";
+  private String csvFile="input.csv";
+  private String extractorJson = "extractor.json";
+  private String enrichmentJson = "enrichment_config.json";
+  private String log4jProperty = "log4j";
+
+  Configuration config = null;
+  /**
+   {
+      "config" : {
+        "columns" : {
+          "host" : 0,
+          "meta" : 2
+                    },
+        "indicator_column" : "host",
+        "separator" : ",",
+        "type" : "enrichment"
+                 },
+      "extractor" : "CSV"
    }
-
-    @After
-    public void teardown() throws Exception {
-        HBaseUtil.INSTANCE.teardown(testUtil);
-    }
-
-    @Test
-    public void testCommandLine() throws Exception {
-        Configuration conf = HBaseConfiguration.create();
-
-        String[] argv = {"-c cf", "-t enrichment", "-e extractor.json", "-n enrichment_config.json", "-l log4j", "-i input.csv"};
-        String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
-        CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs);
-        Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
-        Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim());
-        Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim());
-        Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
-        Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim());
-        Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
-    }
-
-    @Test
-    public void test() throws Exception {
-
-        Assert.assertNotNull(testTable);
-        String contents = "google.com,1,foo";
-
-        EnrichmentConverter converter = new EnrichmentConverter();
-        ExtractorHandler handler = ExtractorHandler.load(extractorConfig);
-        Extractor e = handler.getExtractor();
-        File file = new File (contents);
-        SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
-        testTable.put(loader.extract(contents, e, cf, converter));
-
-        ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
-        List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
-        for(Result r : scanner) {
-            results.add(converter.fromResult(r, cf));
-        }
-        Assert.assertEquals(1, results.size());
-        Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
-        Assert.assertEquals(results.get(0).getKey().type, "enrichment");
-        Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
-        Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
-        Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
+   */
+  @Multiline
+  private static String extractorConfig;
+
+  @Before
+  public void setup() throws Exception {
+    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+    config = kv.getValue();
+    testUtil = kv.getKey();
+    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+  }
+
+  @After
+  public void teardown() throws Exception {
+    HBaseUtil.INSTANCE.teardown(testUtil);
+  }
+
+  @Test
+  public void testCommandLine() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    String[] argv = { "-c cf", "-t enrichment"
+            , "-e extractor.json", "-n enrichment_config.json"
+            , "-l log4j", "-i input.csv"
+            , "-p 2", "-b 128"
+    };
+    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+    CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs);
+    Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
+    Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim());
+    Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim());
+    Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
+    Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim());
+    Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
+    Assert.assertEquals("2", SimpleEnrichmentFlatFileLoader.LoadOptions.NUM_THREADS.get(cli).trim());
+    Assert.assertEquals("128", SimpleEnrichmentFlatFileLoader.LoadOptions.BATCH_SIZE.get(cli).trim());
+  }
+
+  @Test
+  public void test() throws Exception {
+
+    Assert.assertNotNull(testTable);
+    String contents = "google.com,1,foo";
+
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ExtractorHandler handler = ExtractorHandler.load(extractorConfig);
+    Extractor e = handler.getExtractor();
+    SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
+    Stream<String> contentStreams = ImmutableList.of(contents).stream();
+    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+      @Override
+      protected ExtractorState initialValue() {
+        return new ExtractorState(testTable, e, converter);
+      }
+    };
+    loader.load(ImmutableList.of(contentStreams)
+               , state
+               , cf
+               , 2
+               );
+
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for(Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
     }
+    Assert.assertEquals(1, results.size());
+    Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
+    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index 7e9f231..acc1565 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -85,6 +85,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes
             return ReadinessState.READY;
           }
         } else {
+          System.out.println("Missed index...");
           return ReadinessState.NOT_READY;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 03ae9ff..a93c442 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -205,6 +205,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
   private void waitForIndex(String zookeeperQuorum) throws Exception {
     try(CuratorFramework client = getClient(zookeeperQuorum)) {
       client.start();
+      System.out.println("Waiting for zookeeper...");
       byte[] bytes = null;
       do {
         try {
@@ -216,6 +217,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
         }
       }
       while(bytes == null || bytes.length == 0);
+      System.out.println("Found index config in zookeeper...");
     }
   }