You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/06 20:17:31 UTC
[03/17] incubator-metron git commit: METRON-678: Multithread the flat
file loader closes apache/incubator-metron#428
METRON-678: Multithread the flat file loader closes apache/incubator-metron#428
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ad8724ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ad8724ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ad8724ee
Branch: refs/heads/Metron_0.3.1
Commit: ad8724eed0af784fcd6a822a11842d86aefc8832
Parents: cc29dca
Author: cstella <ce...@gmail.com>
Authored: Tue Jan 31 15:19:55 2017 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Jan 31 15:19:55 2017 -0500
----------------------------------------------------------------------
.../common/utils/file/ReaderSpliterator.java | 232 +++++++++++++++++++
.../utils/file/ReaderSpliteratorTest.java | 185 +++++++++++++++
.../metron-data-management/README.md | 21 +-
.../nonbulk/flatfile/ExtractorState.java | 46 ++++
.../SimpleEnrichmentFlatFileLoader.java | 116 ++++++++--
.../SimpleEnrichmentFlatFileLoaderTest.java | 180 +++++++-------
.../ElasticsearchIndexingIntegrationTest.java | 1 +
.../integration/IndexingIntegrationTest.java | 2 +
8 files changed, 669 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java
new file mode 100644
index 0000000..20a40fa
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/file/ReaderSpliterator.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.file;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static java.util.Spliterators.spliterator;
+
+/**
+ * A Spliterator which works well on sequential streams by constructing a
+ * fixed batch size split rather than inheriting the spliterator from BufferedReader.lines()
+ * which gives up and reports no size and has no strategy for batching. This is a bug
+ * in Java 8 and will be fixed in Java 9.
+ *
+ * The ideas have been informed by https://www.airpair.com/java/posts/parallel-processing-of-io-based-data-with-java-streams
+ * except more specific to strings and motivated by a JDK 8 bug as
+ * described at http://bytefish.de/blog/jdk8_files_lines_parallel_stream/
+ */
+public class ReaderSpliterator implements Spliterator<String> {
+ private static int characteristics = NONNULL | ORDERED | IMMUTABLE;
+ private int batchSize ;
+ private BufferedReader reader;
+ public ReaderSpliterator(BufferedReader reader) {
+ this(reader, 128);
+ }
+
+ public ReaderSpliterator(BufferedReader reader, int batchSize) {
+ this.batchSize = batchSize;
+ this.reader = reader;
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super String> action) {
+ if (action == null) {
+ throw new NullPointerException();
+ }
+ try {
+ for (String line = null; (line = reader.readLine()) != null;) {
+ action.accept(line);
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ /**
+ * If a remaining element exists, performs the given action on it,
+ * returning {@code true}; else returns {@code false}. If this
+ * Spliterator is {@link #ORDERED} the action is performed on the
+ * next element in encounter order. Exceptions thrown by the
+ * action are relayed to the caller.
+ *
+ * @param action The action
+ * @return {@code false} if no remaining elements existed
+ * upon entry to this method, else {@code true}.
+ * @throws NullPointerException if the specified action is null
+ */
+ @Override
+ public boolean tryAdvance(Consumer<? super String> action) {
+ if (action == null) {
+ throw new NullPointerException();
+ }
+ try {
+ final String line = reader.readLine();
+ if (line == null) {
+ return false;
+ }
+ action.accept(line);
+ return true;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * If this spliterator can be partitioned, returns a Spliterator
+ * covering elements, that will, upon return from this method, not
+ * be covered by this Spliterator.
+ * <p>
+ * <p>If this Spliterator is {@link #ORDERED}, the returned Spliterator
+ * must cover a strict prefix of the elements.
+ * <p>
+ * <p>Unless this Spliterator covers an infinite number of elements,
+ * repeated calls to {@code trySplit()} must eventually return {@code null}.
+ * Upon non-null return:
+ * <ul>
+ * <li>the value reported for {@code estimateSize()} before splitting,
+ * must, after splitting, be greater than or equal to {@code estimateSize()}
+ * for this and the returned Spliterator; and</li>
+ * <li>if this Spliterator is {@code SUBSIZED}, then {@code estimateSize()}
+ * for this spliterator before splitting must be equal to the sum of
+ * {@code estimateSize()} for this and the returned Spliterator after
+ * splitting.</li>
+ * </ul>
+ * <p>
+ * <p>This method may return {@code null} for any reason,
+ * including emptiness, inability to split after traversal has
+ * commenced, data structure constraints, and efficiency
+ * considerations.
+ *
+ * @return a {@code Spliterator} covering some portion of the
+ * elements, or {@code null} if this spliterator cannot be split
+ * @apiNote An ideal {@code trySplit} method efficiently (without
+ * traversal) divides its elements exactly in half, allowing
+ * balanced parallel computation. Many departures from this ideal
+ * remain highly effective; for example, only approximately
+ * splitting an approximately balanced tree, or for a tree in
+ * which leaf nodes may contain either one or two elements,
+ * failing to further split these nodes. However, large
+ * deviations in balance and/or overly inefficient {@code
+ * trySplit} mechanics typically result in poor parallel
+ * performance.
+ */
+ @Override
+ public Spliterator<String> trySplit() {
+ final ConsumerWithLookback holder = new ConsumerWithLookback();
+ if (!tryAdvance(holder)) {
+ return null;
+ }
+ final String[] batch = new String[batchSize];
+ int j = 0;
+ do {
+ batch[j] = holder.value;
+ }
+ while (++j < batchSize && tryAdvance(holder));
+ return spliterator(batch, 0, j, characteristics() | SIZED);
+ }
+
+ /**
+ * Returns an estimate of the number of elements that would be
+ * encountered by a {@link #forEachRemaining} traversal, or returns {@link
+ * Long#MAX_VALUE} if infinite, unknown, or too expensive to compute.
+ * <p>
+ * <p>If this Spliterator is {@link #SIZED} and has not yet been partially
+ * traversed or split, or this Spliterator is {@link #SUBSIZED} and has
+ * not yet been partially traversed, this estimate must be an accurate
+ * count of elements that would be encountered by a complete traversal.
+ * Otherwise, this estimate may be arbitrarily inaccurate, but must decrease
+ * as specified across invocations of {@link #trySplit}.
+ *
+ * @return the estimated size, or {@code Long.MAX_VALUE} if infinite,
+ * unknown, or too expensive to compute.
+ * @apiNote Even an inexact estimate is often useful and inexpensive to compute.
+ * For example, a sub-spliterator of an approximately balanced binary tree
+ * may return a value that estimates the number of elements to be half of
+ * that of its parent; if the root Spliterator does not maintain an
+ * accurate count, it could estimate size to be the power of two
+ * corresponding to its maximum depth.
+ */
+ @Override
+ public long estimateSize() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Returns a set of characteristics of this Spliterator and its
+ * elements. The result is represented as ORed values from {@link
+ * #ORDERED}, {@link #DISTINCT}, {@link #SORTED}, {@link #SIZED},
+ * {@link #NONNULL}, {@link #IMMUTABLE}, {@link #CONCURRENT},
+ * {@link #SUBSIZED}. Repeated calls to {@code characteristics()} on
+ * a given spliterator, prior to or in-between calls to {@code trySplit},
+ * should always return the same result.
+ * <p>
+ * <p>If a Spliterator reports an inconsistent set of
+ * characteristics (either those returned from a single invocation
+ * or across multiple invocations), no guarantees can be made
+ * about any computation using this Spliterator.
+ *
+ * @return a representation of characteristics
+ * @apiNote The characteristics of a given spliterator before splitting
+ * may differ from the characteristics after splitting. For specific
+ * examples see the characteristic values {@link #SIZED}, {@link #SUBSIZED}
+ * and {@link #CONCURRENT}.
+ */
+ @Override
+ public int characteristics() {
+ return characteristics;
+ }
+
+ static class ConsumerWithLookback implements Consumer<String> {
+ String value;
+ /**
+ * Performs this operation on the given argument.
+ *
+ * @param string the input argument
+ */
+ @Override
+ public void accept(String string) {
+ this.value = string;
+ }
+ }
+
+ public static Stream<String> lineStream(BufferedReader in, int batchSize) {
+ return lineStream(in, batchSize, false);
+ }
+
+ public static Stream<String> lineStream(BufferedReader in, int batchSize, boolean isParallel) {
+ return StreamSupport.stream(new ReaderSpliterator(in, batchSize), isParallel)
+ .onClose(() -> {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
new file mode 100644
index 0000000..965840f
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/file/ReaderSpliteratorTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.file;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ReaderSpliteratorTest {
+ /**
+ foo
+ bar
+ grok
+ foo
+ the
+ and
+ grok
+ foo
+ bar
+ */
+ @Multiline
+ public static String data;
+ public static final File dataFile = new File("target/readerspliteratortest.data");
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ if(dataFile.exists()) {
+ dataFile.delete();
+ }
+ Files.write(dataFile.toPath(), data.getBytes(), StandardOpenOption.CREATE_NEW, StandardOpenOption.TRUNCATE_EXISTING);
+ dataFile.deleteOnExit();
+ }
+
+ public static BufferedReader getReader() throws FileNotFoundException {
+ return new BufferedReader(new FileReader(dataFile));
+ }
+
+ @Test
+ public void testParallelStreamSmallBatch() throws FileNotFoundException {
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
+
+ Map<String, Integer> count =
+ stream.parallel().map( s -> s.trim())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertEquals(5, count.size());
+ Assert.assertEquals(3, (int)count.get("foo"));
+ Assert.assertEquals(2, (int)count.get("bar"));
+ Assert.assertEquals(1, (int)count.get("and"));
+ Assert.assertEquals(1, (int)count.get("the"));
+ }
+ }
+
+ @Test
+ public void testParallelStreamLargeBatch() throws FileNotFoundException {
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 100)) {
+ Map<String, Integer> count =
+ stream.parallel().map(s -> s.trim())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertEquals(5, count.size());
+ Assert.assertEquals(3, (int) count.get("foo"));
+ Assert.assertEquals(2, (int) count.get("bar"));
+ Assert.assertEquals(1, (int) count.get("and"));
+ Assert.assertEquals(1, (int) count.get("the"));
+ }
+ }
+
+ @Test
+ public void testSequentialStreamLargeBatch() throws FileNotFoundException {
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 100)) {
+ Map<String, Integer> count =
+ stream.map(s -> s.trim())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertEquals(5, count.size());
+ Assert.assertEquals(3, (int) count.get("foo"));
+ Assert.assertEquals(2, (int) count.get("bar"));
+ Assert.assertEquals(1, (int) count.get("and"));
+ Assert.assertEquals(1, (int) count.get("the"));
+ }
+ }
+
+ @Test
+ public void testActuallyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
+ //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most min(5, 2) = 2 threads will be used
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(2);
+ forkJoinPool.submit(() -> {
+ Map<String, Integer> threads =
+ stream.parallel().map(s -> Thread.currentThread().getName())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertTrue(threads.size() <= 2);
+ }
+ ).get();
+ }
+ }
+
+ @Test
+ public void testActuallyParallel_mediumBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
+ //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2)) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(10);
+ forkJoinPool.submit(() -> {
+ Map<String, Integer> threads =
+ stream.parallel().map(s -> Thread.currentThread().getName())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
+ }
+ ).get();
+ }
+ }
+
+ @Test
+ public void testActuallyParallel_mediumBatchNotImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
+ //Since this is not parallel and we're not making the stream itself parallel, we should only use one thread from the thread pool.
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, false)) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(10);
+ forkJoinPool.submit(() -> {
+ Map<String, Integer> threads =
+ stream.map(s -> Thread.currentThread().getName())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertTrue(threads.size() == 1);
+ }
+ ).get();
+ }
+ }
+
+ @Test
+ public void testActuallyParallel_mediumBatchImplicitlyParallel() throws ExecutionException, InterruptedException, FileNotFoundException {
+ //With 9 elements and a batch of 2, we should only ceil(9/2) = 5 batches, so at most 5 threads of the pool of 10 will be used
+ //despite not calling .parallel() on the stream, we are constructing the stream to be implicitly parallel
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 2, true)) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(10);
+ forkJoinPool.submit(() -> {
+ Map<String, Integer> threads =
+ stream.map(s -> Thread.currentThread().getName())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertTrue(threads.size() <= (int) Math.ceil(9.0 / 2) && threads.size() > 1);
+ }
+ ).get();
+ }
+ }
+
+ @Test
+ public void testActuallyParallel_bigBatch() throws ExecutionException, InterruptedException, FileNotFoundException {
+ //With 9 elements and a batch of 10, we should only have one batch, so only one thread will be used
+ //despite the thread pool size of 2.
+ try( Stream<String> stream = ReaderSpliterator.lineStream(getReader(), 10)) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(2);
+ forkJoinPool.submit(() -> {
+ Map<String, Integer> threads =
+ stream.parallel().map(s -> Thread.currentThread().getName())
+ .collect(Collectors.toMap(s -> s, s -> 1, Integer::sum));
+ Assert.assertEquals(1, threads.size());
+ }
+ ).get();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index a0c0164..26dd472 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -240,16 +240,17 @@ each document to be considered as input to the Extractor.
The parameters for the utility are as follows:
-| Short Code | Long Code | Is Required? | Description |
-|------------|---------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| -h | | No | Generate the help screen/set of options |
-| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source |
-| -t | --hbase_table | Yes | The HBase table to import into |
-| -c | --hbase_cf | Yes | The HBase table column family to import into |
-| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. |
-| -l | --log4j | No | The log4j properties file to load |
-| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. |
-
+| Short Code | Long Code | Is Required? | Description | |
+|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
+| -h | | No | Generate the help screen/set of options | |
+| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source | |
+| -t | --hbase_table | Yes | The HBase table to import into | |
+| -c | --hbase_cf | Yes | The HBase table column family to import into | |
+| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | |
+| -l | --log4j | No | The log4j properties file to load | |
+| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. | |
+| -p | --threads | No | The number of threads to use when extracting data. The default is the number of cores. | |
+| -b | --batchSize | No | The batch size to use for HBase puts | |
### GeoLite2 Loader
The shell script `$METRON_HOME/bin/geo_enrichment_load.sh` will retrieve MaxMind GeoLite2 data and load data into HDFS, and update the configuration.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
new file mode 100644
index 0000000..e44eb27
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+
+public class ExtractorState {
+ private HTableInterface table;
+ private Extractor extractor;
+ private HbaseConverter converter;
+
+ public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter) {
+ this.table = table;
+ this.extractor = extractor;
+ this.converter = converter;
+ }
+
+ public HTableInterface getTable() {
+ return table;
+ }
+
+ public Extractor getExtractor() {
+ return extractor;
+ }
+
+ public HbaseConverter getConverter() {
+ return converter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
index 0c7501a..9992422 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -20,6 +20,7 @@ package org.apache.metron.dataloads.nonbulk.flatfile;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import org.apache.commons.cli.*;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.file.ReaderSpliterator;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
@@ -39,13 +42,13 @@ import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.common.utils.JSONUtils;
import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
+import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Stream;
public class SimpleEnrichmentFlatFileLoader {
private static abstract class OptionHandler implements Function<String, Option> {}
@@ -111,6 +114,26 @@ public class SimpleEnrichmentFlatFileLoader {
return o;
}
})
+ ,NUM_THREADS("p", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine.");
+ o.setArgName("NUM_THREADS");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,BATCH_SIZE("b", new OptionHandler() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
+ o.setArgName("SIZE");
+ o.setRequired(false);
+ return o;
+ }
+ })
,INPUT("i", new OptionHandler() {
@Nullable
@Override
@@ -207,25 +230,55 @@ public class SimpleEnrichmentFlatFileLoader {
return ret;
}
-
- public void loadFile( File inputFile
- , Extractor extractor
- , HTableInterface table
- , String cf
- , HbaseConverter converter
- , boolean lineByLine
- ) throws IOException
+ public void load( final Iterable<Stream<String>> streams
+ , final ThreadLocal<ExtractorState> state
+ , final String cf
+ , int numThreads
+ )
{
+ for(Stream<String> stream : streams) {
+ try {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
+ forkJoinPool.submit(() ->
+ stream.parallel().forEach(input -> {
+ ExtractorState es = state.get();
+ try {
+ es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter()));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
+ }
+ }
+ )
+ ).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ } finally {
+ stream.close();
+ }
+ }
+ }
+
+ private static Iterable<Stream<String>> streamify(List<File> files, int batchSize, boolean lineByLine) throws FileNotFoundException {
+ List<Stream<String>> ret = new ArrayList<>();
if(!lineByLine) {
- table.put(extract(FileUtils.readFileToString(inputFile), extractor, cf, converter));
+ ret.add(files.stream().map(f -> {
+ try {
+ return FileUtils.readFileToString(f);
+ } catch (IOException e) {
+ throw new IllegalStateException("File " + f.getName() + " not found.");
+ }
+ }));
}
else {
- BufferedReader br = new BufferedReader(new FileReader(inputFile));
- for(String line = null;(line = br.readLine()) != null;) {
- table.put(extract(line, extractor, cf, converter));
+ for(File f : files) {
+ ret.add(ReaderSpliterator.lineStream(new BufferedReader(new FileReader(f)), batchSize));
}
}
+ return ret;
}
+
public static void main(String... argv) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
@@ -237,23 +290,40 @@ public class SimpleEnrichmentFlatFileLoader {
ExtractorHandler handler = ExtractorHandler.load(
FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
);
+ int batchSize = 128;
+ if(LoadOptions.BATCH_SIZE.has(cli)) {
+ batchSize = ConversionUtils.convert(LoadOptions.BATCH_SIZE.get(cli), Integer.class);
+ }
+ int numThreads = Runtime.getRuntime().availableProcessors();
+ if(LoadOptions.NUM_THREADS.has(cli)) {
+ numThreads = ConversionUtils.convert(LoadOptions.NUM_THREADS.get(cli), Integer.class);
+ }
boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class);
- Extractor e = handler.getExtractor();
SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null;
if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) {
sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli))
, SensorEnrichmentUpdateConfig.class
);
}
- HbaseConverter converter = new EnrichmentConverter();
List<File> inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli)));
SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
- HTableInterface table = loader.getProvider()
- .getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
+ ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+ @Override
+ protected ExtractorState initialValue() {
+ try {
+ ExtractorHandler handler = ExtractorHandler.load(
+ FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
+ );
+ HTableInterface table = loader.getProvider().getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
+ return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter());
+ } catch (IOException e1) {
+ throw new IllegalStateException("Unable to get table: " + e1);
+ }
+ }
+ };
+
+ loader.load(streamify(inputFiles, batchSize, lineByLine), state, LoadOptions.HBASE_CF.get(cli), numThreads);
- for (File f : inputFiles) {
- loader.loadFile(f, e, table, LoadOptions.HBASE_CF.get(cli), converter, lineByLine);
- }
if(sensorEnrichmentUpdateConfig != null) {
sensorEnrichmentUpdateConfig.updateSensorConfigs();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
index b4891aa..4ffb91a 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.dataloads.nonbulk.flatfile;
+import com.google.common.collect.ImmutableList;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.PosixParser;
@@ -56,91 +57,108 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
public class SimpleEnrichmentFlatFileLoaderTest {
- private HBaseTestingUtility testUtil;
-
- /** The test table. */
- private HTable testTable;
- private String tableName = "enrichment";
- private String cf = "cf";
- private String csvFile="input.csv";
- private String extractorJson = "extractor.json";
- private String enrichmentJson = "enrichment_config.json";
- private String log4jProperty = "log4j";
-
- Configuration config = null;
- /**
- {
- "config" : {
- "columns" : {
- "host" : 0,
- "meta" : 2
- },
- "indicator_column" : "host",
- "separator" : ",",
- "type" : "enrichment"
- },
- "extractor" : "CSV"
- }
- */
- @Multiline
- private static String extractorConfig;
-
- @Before
- public void setup() throws Exception {
- Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
- config = kv.getValue();
- testUtil = kv.getKey();
- testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+ private HBaseTestingUtility testUtil;
+
+ /** The test table. */
+ private HTable testTable;
+ private String tableName = "enrichment";
+ private String cf = "cf";
+ private String csvFile="input.csv";
+ private String extractorJson = "extractor.json";
+ private String enrichmentJson = "enrichment_config.json";
+ private String log4jProperty = "log4j";
+
+ Configuration config = null;
+ /**
+ {
+ "config" : {
+ "columns" : {
+ "host" : 0,
+ "meta" : 2
+ },
+ "indicator_column" : "host",
+ "separator" : ",",
+ "type" : "enrichment"
+ },
+ "extractor" : "CSV"
}
-
- @After
- public void teardown() throws Exception {
- HBaseUtil.INSTANCE.teardown(testUtil);
- }
-
- @Test
- public void testCommandLine() throws Exception {
- Configuration conf = HBaseConfiguration.create();
-
- String[] argv = {"-c cf", "-t enrichment", "-e extractor.json", "-n enrichment_config.json", "-l log4j", "-i input.csv"};
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
- CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs);
- Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
- Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim());
- Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim());
- Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
- Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim());
- Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
- }
-
- @Test
- public void test() throws Exception {
-
- Assert.assertNotNull(testTable);
- String contents = "google.com,1,foo";
-
- EnrichmentConverter converter = new EnrichmentConverter();
- ExtractorHandler handler = ExtractorHandler.load(extractorConfig);
- Extractor e = handler.getExtractor();
- File file = new File (contents);
- SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
- testTable.put(loader.extract(contents, e, cf, converter));
-
- ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
- List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
- for(Result r : scanner) {
- results.add(converter.fromResult(r, cf));
- }
- Assert.assertEquals(1, results.size());
- Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
- Assert.assertEquals(results.get(0).getKey().type, "enrichment");
- Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
- Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
- Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
+ */
+ @Multiline
+ private static String extractorConfig;
+
+ @Before
+ public void setup() throws Exception {
+ Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+ config = kv.getValue();
+ testUtil = kv.getKey();
+ testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+ }
+
+ @After
+ public void teardown() throws Exception {
+ HBaseUtil.INSTANCE.teardown(testUtil);
+ }
+
+ @Test
+ public void testCommandLine() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+
+ String[] argv = { "-c cf", "-t enrichment"
+ , "-e extractor.json", "-n enrichment_config.json"
+ , "-l log4j", "-i input.csv"
+ , "-p 2", "-b 128"
+ };
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+
+ CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs);
+ Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
+ Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim());
+ Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim());
+ Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
+ Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim());
+ Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
+ Assert.assertEquals("2", SimpleEnrichmentFlatFileLoader.LoadOptions.NUM_THREADS.get(cli).trim());
+ Assert.assertEquals("128", SimpleEnrichmentFlatFileLoader.LoadOptions.BATCH_SIZE.get(cli).trim());
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ Assert.assertNotNull(testTable);
+ String contents = "google.com,1,foo";
+
+ EnrichmentConverter converter = new EnrichmentConverter();
+ ExtractorHandler handler = ExtractorHandler.load(extractorConfig);
+ Extractor e = handler.getExtractor();
+ SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
+ Stream<String> contentStreams = ImmutableList.of(contents).stream();
+ ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+ @Override
+ protected ExtractorState initialValue() {
+ return new ExtractorState(testTable, e, converter);
+ }
+ };
+ loader.load(ImmutableList.of(contentStreams)
+ , state
+ , cf
+ , 2
+ );
+
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+ for(Result r : scanner) {
+ results.add(converter.fromResult(r, cf));
}
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
+ Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+ Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index 7e9f231..acc1565 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -85,6 +85,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes
return ReadinessState.READY;
}
} else {
+ System.out.println("Missed index...");
return ReadinessState.NOT_READY;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ad8724ee/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 03ae9ff..a93c442 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -205,6 +205,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
private void waitForIndex(String zookeeperQuorum) throws Exception {
try(CuratorFramework client = getClient(zookeeperQuorum)) {
client.start();
+ System.out.println("Waiting for zookeeper...");
byte[] bytes = null;
do {
try {
@@ -216,6 +217,7 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
}
}
while(bytes == null || bytes.length == 0);
+ System.out.println("Found index config in zookeeper...");
}
}