You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/06 16:05:26 UTC
[1/2] incubator-metron git commit: METRON-682: Unify and Improve the
Flat File Loader closes apache/incubator-metron#432
Repository: incubator-metron
Updated Branches:
refs/heads/master a11e85c55 -> 1be4fcb02
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java
new file mode 100644
index 0000000..5f2db33
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import java.io.*;
+import java.util.List;
+import java.util.Optional;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public interface RawLocation<T> {
+ Optional<List<String>> list(String loc) throws IOException;
+ boolean exists(String loc) throws IOException;
+ boolean isDirectory(String loc) throws IOException;
+
+ InputStream openInputStream(String loc) throws IOException;
+ boolean match(String loc);
+ default void init(T state) {
+
+ }
+
+ default BufferedReader openReader(String loc) throws IOException {
+ InputStream is = openInputStream(loc);
+ if(loc.endsWith(".gz")) {
+ return new BufferedReader(new InputStreamReader(new GZIPInputStream(is)));
+ }
+ else if(loc.endsWith(".zip")) {
+ ZipInputStream zis = new ZipInputStream(is);
+ ZipEntry entry = zis.getNextEntry();
+ if(entry != null) {
+ return new BufferedReader(new InputStreamReader(zis));
+ }
+ else {
+ return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(new byte[] {})));
+ }
+ }
+ else {
+ return new BufferedReader(new InputStreamReader(is));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java
new file mode 100644
index 0000000..cc8edbe
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+
+public class URLLocation implements RawLocation {
+
+ @Override
+ public Optional<List<String>> list(String loc) throws IOException {
+ return Optional.of(Collections.emptyList());
+ }
+
+ @Override
+ public boolean exists(String loc) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean isDirectory(String loc) throws IOException {
+ return false;
+ }
+
+ @Override
+ public InputStream openInputStream(String loc) throws IOException {
+ return new URL(loc).openConnection().getInputStream();
+ }
+
+ @Override
+ public boolean match(String loc) {
+ try {
+ new URL(loc);
+ return true;
+ } catch (MalformedURLException e) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh b/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
index bba7f8e..b9e2746 100755
--- a/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
+++ b/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
@@ -27,9 +27,25 @@ elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
. /usr/lib/bigtop-utils/bigtop-detect-javahome
fi
-export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
export METRON_VERSION=${project.version}
export METRON_HOME=/usr/metron/$METRON_VERSION
+export CLASSNAME="org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader"
export DM_JAR=${project.artifactId}-$METRON_VERSION.jar
-CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
-java -cp $CP org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader "$@"
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+
+if [ $(which hadoop) ]
+then
+ HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+ for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+ if [ -f $jar ];then
+ LIBJARS="$jar,$LIBJARS"
+ fi
+ done
+ export HADOOP_CLASSPATH
+ hadoop jar $METRON_HOME/lib/$DM_JAR $CLASSNAME -libjars ${LIBJARS} "$@"
+else
+ echo "Warning: Metron cannot find the hadoop client on this node. This means that loading via Map Reduce will NOT function."
+ CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+ java -cp $CP $CLASSNAME "$@"
+fi
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh b/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh
deleted file mode 100755
index 865d0ad..0000000
--- a/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
-[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
-
-# Autodetect JAVA_HOME if not defined
-if [ -e /usr/libexec/bigtop-detect-javahome ]; then
- . /usr/libexec/bigtop-detect-javahome
-elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
- . /usr/lib/bigtop-utils/bigtop-detect-javahome
-fi
-
-export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
-HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
-for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
- if [ -f $jar ];then
- LIBJARS="$jar,$LIBJARS"
- fi
-done
-export HADOOP_CLASSPATH
-export METRON_VERSION=${project.version}
-export METRON_HOME=/usr/metron/$METRON_VERSION
-export DM_JAR=${project.artifactId}-$METRON_VERSION.jar
-hadoop jar $METRON_HOME/lib/$DM_JAR org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
deleted file mode 100644
index b7a753b..0000000
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.hbase.mr;
-
-import com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.PosixParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-
-public class BulkLoadMapperIntegrationTest {
- /** The test util. */
- private HBaseTestingUtility testUtil;
-
- /** The test table. */
- private HTable testTable;
- private String tableName = "malicious_domains";
- private String cf = "cf";
- private String csvFile="input.csv";
- private String extractorJson = "extractor.json";
- private String enrichmentJson = "enrichment_config.json";
- private String asOf = "04/15/2016";
- private String asOfFormat = "georgia";
- private String convertClass = "threadIntel.class";
- private Configuration config = null;
-
-
- @Before
- public void setup() throws Exception {
- UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
- Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
- config = kv.getValue();
- testUtil = kv.getKey();
- testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
- }
-
- @After
- public void teardown() throws Exception {
- HBaseUtil.INSTANCE.teardown(testUtil);
- }
- /**
- {
- "config" : {
- "columns" : {
- "host" : 0
- ,"meta" : 2
- }
- ,"indicator_column" : "host"
- ,"separator" : ","
- ,"type" : "threat"
- }
- ,"extractor" : "CSV"
- }
- */
- @Multiline
- private static String extractorConfig;
-
- @Test
- public void testCommandLine() throws Exception {
- UnitTestHelper.setJavaLoggingLevel(GuiceComponentProviderFactory.class, Level.WARNING);
- Configuration conf = HBaseConfiguration.create();
-
- String[] argv = {"-f cf", "-t malicious_domains", "-e extractor.json", "-n enrichment_config.json", "-a 04/15/2016", "-i input.csv", "-z georgia", "-c threadIntel.class"};
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
- CommandLine cli = ThreatIntelBulkLoader.BulkLoadOptions.parse(new PosixParser(), otherArgs);
- Assert.assertEquals(extractorJson,ThreatIntelBulkLoader.BulkLoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
- Assert.assertEquals(cf, ThreatIntelBulkLoader.BulkLoadOptions.COLUMN_FAMILY.get(cli).trim());
- Assert.assertEquals(tableName,ThreatIntelBulkLoader.BulkLoadOptions.TABLE.get(cli).trim());
- Assert.assertEquals(enrichmentJson,ThreatIntelBulkLoader.BulkLoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
- Assert.assertEquals(csvFile,ThreatIntelBulkLoader.BulkLoadOptions.INPUT_DATA.get(cli).trim());
- Assert.assertEquals(asOf, ThreatIntelBulkLoader.BulkLoadOptions.AS_OF_TIME.get(cli).trim());
- Assert.assertEquals(asOfFormat, ThreatIntelBulkLoader.BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli).trim());
- Assert.assertEquals(convertClass, ThreatIntelBulkLoader.BulkLoadOptions.CONVERTER.get(cli).trim());
- }
-
- @Test
- public void test() throws IOException, ClassNotFoundException, InterruptedException {
-
- Assert.assertNotNull(testTable);
- FileSystem fs = FileSystem.get(config);
- String contents = "google.com,1,foo";
- EnrichmentConverter converter = new EnrichmentConverter();
- HBaseUtil.INSTANCE.writeFile(contents, new Path("input.csv"), fs);
- Job job = ThreatIntelBulkLoader.createJob(config, "input.csv", tableName, cf, extractorConfig, 0L, new EnrichmentConverter());
- Assert.assertTrue(job.waitForCompletion(true));
- ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
- List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
- for(Result r : scanner) {
- results.add(converter.fromResult(r, cf));
- }
- Assert.assertEquals(1, results.size());
- Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
- Assert.assertEquals(results.get(0).getKey().type, "threat");
- Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
- Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
- Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
index 14a5143..d82be9d 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -36,10 +36,7 @@ import org.apache.metron.enrichment.lookup.LookupKey;
import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker;
import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker;
import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
import java.util.ArrayList;
import java.util.HashMap;
@@ -49,21 +46,21 @@ import java.util.logging.Level;
public class LeastRecentlyUsedPrunerIntegrationTest {
/** The test util. */
- private HBaseTestingUtility testUtil;
+ private static HBaseTestingUtility testUtil;
/** The test table. */
- private HTable testTable;
- private HTable atTable;
- private String tableName = "malicious_domains";
- private String cf = "cf";
- private String atTableName = "access_trackers";
- private String atCF= "cf";
- private String beginTime = "04/14/2016 12:00:00";
- private String timeFormat = "georgia";
- private Configuration config = null;
+ private static HTable testTable;
+ private static HTable atTable;
+ private static final String tableName = "malicious_domains";
+ private static final String cf = "cf";
+ private static final String atTableName = "access_trackers";
+ private static final String atCF= "cf";
+ private static final String beginTime = "04/14/2016 12:00:00";
+ private static final String timeFormat = "georgia";
+ private static Configuration config = null;
- @Before
- public void setup() throws Exception {
+ @BeforeClass
+ public static void setup() throws Exception {
UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
config = kv.getValue();
@@ -71,10 +68,12 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF));
}
- @After
- public void teardown() throws Exception {
+
+ @AfterClass
+ public static void teardown() throws Exception {
HBaseUtil.INSTANCE.teardown(testUtil);
}
+
public List<LookupKey> getKeys(int start, int end) {
List<LookupKey> keys = new ArrayList<>();
for(int i = start;i < end;++i) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
new file mode 100644
index 0000000..d0d637d
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.collect.ImmutableList;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.HBaseUtil;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.junit.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.stream.Stream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipOutputStream;
+
+public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
+
+ private static HBaseTestingUtility testUtil;
+
+ /** The test table. */
+ private static HTable testTable;
+ private static Configuration config = null;
+ private static final String tableName = "enrichment";
+ private static final String cf = "cf";
+ private static final String csvFile="input.csv";
+ private static final String extractorJson = "extractor.json";
+ private static final String enrichmentJson = "enrichment_config.json";
+ private static final String log4jProperty = "log4j";
+ private static final File file1 = new File("target/sefflt_data_1.csv");
+ private static final File file2 = new File("target/sefflt_data_2.csv");
+ private static final File multilineFile= new File("target/sefflt_data_2.csv");
+ private static final File multilineZipFile= new File("target/sefflt_data_2.csv.zip");
+ private static final File multilineGzFile= new File("target/sefflt_data_2.csv.gz");
+ private static final File lineByLineExtractorConfigFile = new File("target/sefflt_extractorConfig_lbl.json");
+ private static final File wholeFileExtractorConfigFile = new File("target/sefflt_extractorConfig_wf.json");
+ private static final int NUM_LINES = 1000;
+
+ /**
+ {
+ "config" : {
+ "columns" : {
+ "host" : 0,
+ "meta" : 2
+ },
+ "indicator_column" : "host",
+ "separator" : ",",
+ "type" : "enrichment"
+ },
+ "extractor" : "CSV"
+ }
+ */
+ @Multiline
+ private static String lineByLineExtractorConfig;
+
+ /**
+ {
+ "config" : {
+ "columns" : {
+ "host" : 0,
+ "meta" : 2
+ },
+ "indicator_column" : "host",
+ "separator" : ",",
+ "type" : "enrichment"
+ },
+ "extractor" : "CSV",
+ "inputFormat" : "WHOLE_FILE"
+ }
+ */
+ @Multiline
+ private static String wholeFileExtractorConfig;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
+ Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+ config = kv.getValue();
+ testUtil = kv.getKey();
+ testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+
+ for(Result r : testTable.getScanner(Bytes.toBytes(cf))) {
+ Delete d = new Delete(r.getRow());
+ testTable.delete(d);
+ }
+
+ if(lineByLineExtractorConfigFile.exists()) {
+ lineByLineExtractorConfigFile.delete();
+ }
+ Files.write( lineByLineExtractorConfigFile.toPath()
+ , lineByLineExtractorConfig.getBytes()
+ , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+ );
+ if(wholeFileExtractorConfigFile.exists()) {
+ wholeFileExtractorConfigFile.delete();
+ }
+ Files.write( wholeFileExtractorConfigFile.toPath()
+ , wholeFileExtractorConfig.getBytes()
+ , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+ );
+ if(file1.exists()) {
+ file1.delete();
+ }
+ Files.write( file1.toPath()
+ , "google1.com,1,foo2\n".getBytes()
+ , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+ );
+ if(file2.exists()) {
+ file2.delete();
+ }
+ Files.write( file2.toPath()
+ , "google2.com,2,foo2\n".getBytes()
+ , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+ );
+
+ if(multilineFile.exists()) {
+ multilineFile.delete();
+ }
+ if(multilineGzFile.exists()) {
+ multilineGzFile.delete();
+ }
+ if(multilineGzFile.exists()) {
+ multilineZipFile.delete();
+ }
+ PrintWriter[] pws =new PrintWriter[] {};
+ try {
+ ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(multilineZipFile));
+ ZipEntry entry = new ZipEntry("file");
+ zos.putNextEntry(entry);
+ pws = new PrintWriter[]{
+ new PrintWriter(multilineFile),
+ new PrintWriter(zos),
+ new PrintWriter(new GZIPOutputStream(new FileOutputStream(multilineGzFile)))
+ };
+ for(int i = 0;i < NUM_LINES;++i) {
+ for(PrintWriter pw : pws) {
+ pw.println("google" + i + ".com," + i + ",foo" + i);
+ }
+ }
+ }
+ finally {
+ for(PrintWriter pw : pws) {
+ pw.close();
+ }
+ }
+
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ HBaseUtil.INSTANCE.teardown(testUtil);
+ file1.delete();
+ file2.delete();
+ multilineFile.delete();
+ multilineGzFile.delete();
+ multilineZipFile.delete();
+ lineByLineExtractorConfigFile.delete();
+ wholeFileExtractorConfigFile.delete();
+ }
+
+
+ @Test
+ public void testArgs() throws Exception {
+ String[] argv = {"-c cf", "-t enrichment"
+ , "-e extractor.json", "-n enrichment_config.json"
+ , "-l log4j", "-i input.csv"
+ , "-p 2", "-b 128", "-q"
+ };
+
+ String[] otherArgs = new GenericOptionsParser(config, argv).getRemainingArgs();
+
+ CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
+ Assert.assertEquals(extractorJson, LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
+ Assert.assertEquals(cf, LoadOptions.HBASE_CF.get(cli).trim());
+ Assert.assertEquals(tableName, LoadOptions.HBASE_TABLE.get(cli).trim());
+ Assert.assertEquals(enrichmentJson, LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
+ Assert.assertEquals(csvFile, LoadOptions.INPUT.get(cli).trim());
+ Assert.assertEquals(log4jProperty, LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
+ Assert.assertEquals("2", LoadOptions.NUM_THREADS.get(cli).trim());
+ Assert.assertEquals("128", LoadOptions.BATCH_SIZE.get(cli).trim());
+ }
+
+ @Test
+ public void testLocalLineByLine() throws Exception {
+ String[] argv = {"-c cf", "-t enrichment"
+ , "-e " + lineByLineExtractorConfigFile.getPath()
+ , "-i " + multilineFile.getPath()
+ , "-p 2", "-b 128", "-q"
+ };
+ SimpleEnrichmentFlatFileLoader.main(config, argv);
+ EnrichmentConverter converter = new EnrichmentConverter();
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+ for (Result r : scanner) {
+ results.add(converter.fromResult(r, cf));
+ testTable.delete(new Delete(r.getRow()));
+ }
+ Assert.assertEquals(NUM_LINES, results.size());
+ Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+ Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+
+ }
+
+ @Test
+ public void testLocalLineByLine_gz() throws Exception {
+ String[] argv = {"-c cf", "-t enrichment"
+ , "-e " + lineByLineExtractorConfigFile.getPath()
+ , "-i " + multilineGzFile.getPath()
+ , "-p 2", "-b 128", "-q"
+ };
+ SimpleEnrichmentFlatFileLoader.main(config, argv);
+ EnrichmentConverter converter = new EnrichmentConverter();
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+ for (Result r : scanner) {
+ results.add(converter.fromResult(r, cf));
+ testTable.delete(new Delete(r.getRow()));
+ }
+ Assert.assertEquals(NUM_LINES, results.size());
+ Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+ Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+
+ }
+
+ @Test
+ public void testLocalLineByLine_zip() throws Exception {
+ String[] argv = {"-c cf", "-t enrichment"
+ , "-e " + lineByLineExtractorConfigFile.getPath()
+ , "-i " + multilineZipFile.getPath()
+ , "-p 2", "-b 128", "-q"
+ };
+ SimpleEnrichmentFlatFileLoader.main(config, argv);
+ EnrichmentConverter converter = new EnrichmentConverter();
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+ for (Result r : scanner) {
+ results.add(converter.fromResult(r, cf));
+ testTable.delete(new Delete(r.getRow()));
+ }
+ Assert.assertEquals(NUM_LINES, results.size());
+ Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+ Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+
+ }
+
+ @Test
+ public void testLocalWholeFile() throws Exception {
+ String[] argv = { "-c cf", "-t enrichment"
+ , "-e " + wholeFileExtractorConfigFile.getPath()
+ , "-i " + file1.getPath() + "," + file2.getPath()
+ , "-p 2", "-b 128", "-q"
+ };
+ SimpleEnrichmentFlatFileLoader.main(config, argv);
+ EnrichmentConverter converter = new EnrichmentConverter();
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+ for(Result r : scanner) {
+ results.add(converter.fromResult(r, cf));
+ testTable.delete(new Delete(r.getRow()));
+ }
+ Assert.assertEquals(2, results.size());
+ Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+ Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith( "google"));
+
+ }
+
+ @Test
+ public void testMRLineByLine() throws Exception {
+ String[] argv = {"-c cf", "-t enrichment"
+ , "-e " + lineByLineExtractorConfigFile.getPath()
+ , "-i " + multilineFile.getName()
+ , "-m MR"
+ , "-p 2", "-b 128", "-q"
+ };
+ FileSystem fs = FileSystem.get(config);
+ HBaseUtil.INSTANCE.writeFile(new String(Files.readAllBytes(multilineFile.toPath())), new Path(multilineFile.getName()), fs);
+ SimpleEnrichmentFlatFileLoader.main(config, argv);
+ EnrichmentConverter converter = new EnrichmentConverter();
+ ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+ List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+ for (Result r : scanner) {
+ results.add(converter.fromResult(r, cf));
+ testTable.delete(new Delete(r.getRow()));
+ }
+ Assert.assertEquals(NUM_LINES, results.size());
+ Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+ Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+ Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+ Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
deleted file mode 100644
index 4ffb91a..0000000
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.nonbulk.flatfile;
-
-import com.google.common.collect.ImmutableList;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader;
-import org.apache.metron.dataloads.extractor.Extractor;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
-import org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader;
-import org.apache.metron.dataloads.hbase.mr.HBaseUtil;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.common.utils.JSONUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-public class SimpleEnrichmentFlatFileLoaderTest {
-
- private HBaseTestingUtility testUtil;
-
- /** The test table. */
- private HTable testTable;
- private String tableName = "enrichment";
- private String cf = "cf";
- private String csvFile="input.csv";
- private String extractorJson = "extractor.json";
- private String enrichmentJson = "enrichment_config.json";
- private String log4jProperty = "log4j";
-
- Configuration config = null;
- /**
- {
- "config" : {
- "columns" : {
- "host" : 0,
- "meta" : 2
- },
- "indicator_column" : "host",
- "separator" : ",",
- "type" : "enrichment"
- },
- "extractor" : "CSV"
- }
- */
- @Multiline
- private static String extractorConfig;
-
- @Before
- public void setup() throws Exception {
- Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
- config = kv.getValue();
- testUtil = kv.getKey();
- testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
- }
-
- @After
- public void teardown() throws Exception {
- HBaseUtil.INSTANCE.teardown(testUtil);
- }
-
- @Test
- public void testCommandLine() throws Exception {
- Configuration conf = HBaseConfiguration.create();
-
- String[] argv = { "-c cf", "-t enrichment"
- , "-e extractor.json", "-n enrichment_config.json"
- , "-l log4j", "-i input.csv"
- , "-p 2", "-b 128"
- };
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
- CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs);
- Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
- Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim());
- Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim());
- Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
- Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim());
- Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
- Assert.assertEquals("2", SimpleEnrichmentFlatFileLoader.LoadOptions.NUM_THREADS.get(cli).trim());
- Assert.assertEquals("128", SimpleEnrichmentFlatFileLoader.LoadOptions.BATCH_SIZE.get(cli).trim());
- }
-
- @Test
- public void test() throws Exception {
-
- Assert.assertNotNull(testTable);
- String contents = "google.com,1,foo";
-
- EnrichmentConverter converter = new EnrichmentConverter();
- ExtractorHandler handler = ExtractorHandler.load(extractorConfig);
- Extractor e = handler.getExtractor();
- SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
- Stream<String> contentStreams = ImmutableList.of(contents).stream();
- ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
- @Override
- protected ExtractorState initialValue() {
- return new ExtractorState(testTable, e, converter);
- }
- };
- loader.load(ImmutableList.of(contentStreams)
- , state
- , cf
- , 2
- );
-
- ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
- List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
- for(Result r : scanner) {
- results.add(converter.fromResult(r, cf));
- }
- Assert.assertEquals(1, results.size());
- Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
- Assert.assertEquals(results.get(0).getKey().type, "enrichment");
- Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
- Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
- Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
index 1cb58d8..0223514 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
@@ -33,10 +33,7 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
import org.apache.metron.test.mock.MockHTable;
import org.apache.metron.enrichment.lookup.LookupKV;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
import java.io.IOException;
import java.util.HashSet;
@@ -44,13 +41,13 @@ import java.util.Set;
public class TaxiiIntegrationTest {
- @Before
- public void setup() throws IOException {
+ @BeforeClass
+ public static void setup() throws IOException {
MockTaxiiService.start(8282);
}
- @After
- public void teardown() {
+ @AfterClass
+ public static void teardown() {
MockTaxiiService.shutdown();
MockHTable.Provider.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index a93c442..ae04e43 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -175,8 +175,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
.withComponent("config", configUploadComponent)
.withComponent("storm", fluxComponent)
.withComponent("search", getSearchComponent(topologyProperties))
- .withMillisecondsBetweenAttempts(15000)
- .withNumRetries(10)
+ .withMillisecondsBetweenAttempts(1500)
+ .withNumRetries(100)
.withMaxTimeMS(150000)
.withCustomShutdownOrder(new String[] {"search","storm","config","kafka","zk"})
.build();
[2/2] incubator-metron git commit: METRON-682: Unify and Improve the
Flat File Loader closes apache/incubator-metron#432
Posted by ce...@apache.org.
METRON-682: Unify and Improve the Flat File Loader closes apache/incubator-metron#432
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/1be4fcb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/1be4fcb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/1be4fcb0
Branch: refs/heads/master
Commit: 1be4fcb0243453863b6aefe0213fe9f0afed5718
Parents: a11e85c
Author: cstella <ce...@gmail.com>
Authored: Mon Feb 6 11:04:32 2017 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Feb 6 11:04:32 2017 -0500
----------------------------------------------------------------------
metron-analytics/metron-statistics/README.md | 2 +-
.../docker/rpm-docker/SPECS/metron.spec | 1 -
.../metron/common/utils/cli/OptionHandler.java | 31 ++
.../metron-data-management/README.md | 22 +-
.../dataloads/bulk/ThreatIntelBulkLoader.java | 260 --------------
.../dataloads/extractor/ExtractorHandler.java | 10 +-
.../extractor/inputformat/Formats.java | 50 +--
.../inputformat/InputFormatHandler.java | 7 +-
.../extractor/inputformat/WholeFileFormat.java | 123 +++----
.../nonbulk/flatfile/ExtractorState.java | 16 +-
.../dataloads/nonbulk/flatfile/LoadOptions.java | 261 ++++++++++++++
.../SimpleEnrichmentFlatFileLoader.java | 290 +--------------
.../flatfile/importer/ImportStrategy.java | 47 +++
.../nonbulk/flatfile/importer/Importer.java | 34 ++
.../flatfile/importer/LocalImporter.java | 177 ++++++++++
.../flatfile/importer/MapReduceImporter.java | 75 ++++
.../nonbulk/flatfile/location/FileLocation.java | 57 +++
.../nonbulk/flatfile/location/HDFSLocation.java | 75 ++++
.../nonbulk/flatfile/location/Location.java | 99 ++++++
.../flatfile/location/LocationStrategy.java | 67 ++++
.../nonbulk/flatfile/location/RawLocation.java | 57 +++
.../nonbulk/flatfile/location/URLLocation.java | 63 ++++
.../src/main/scripts/flatfile_loader.sh | 22 +-
.../src/main/scripts/threatintel_bulk_load.sh | 41 ---
.../hbase/mr/BulkLoadMapperIntegrationTest.java | 140 --------
.../LeastRecentlyUsedPrunerIntegrationTest.java | 35 +-
...EnrichmentFlatFileLoaderIntegrationTest.java | 349 +++++++++++++++++++
.../SimpleEnrichmentFlatFileLoaderTest.java | 164 ---------
.../nonbulk/taxii/TaxiiIntegrationTest.java | 13 +-
.../integration/IndexingIntegrationTest.java | 4 +-
30 files changed, 1567 insertions(+), 1025 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-analytics/metron-statistics/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-statistics/README.md b/metron-analytics/metron-statistics/README.md
index 4d78839..cfd44f2 100644
--- a/metron-analytics/metron-statistics/README.md
+++ b/metron-analytics/metron-statistics/README.md
@@ -45,7 +45,7 @@ functions can be used from everywhere where Stellar is used.
* Input:
* number - The number to take the absolute value of
* Returns: The absolute value of the number passed in.
-*
+
#### `BIN`
* Description: Computes the bin that the value is in given a set of bounds.
* Input:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
index 5c5881c..9466b68 100644
--- a/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
+++ b/metron-deployment/packaging/docker/rpm-docker/SPECS/metron.spec
@@ -181,7 +181,6 @@ This package installs the Metron Parser files
%{metron_home}/bin/flatfile_loader.sh
%{metron_home}/bin/prune_elasticsearch_indices.sh
%{metron_home}/bin/prune_hdfs_files.sh
-%{metron_home}/bin/threatintel_bulk_load.sh
%{metron_home}/bin/threatintel_bulk_prune.sh
%{metron_home}/bin/threatintel_taxii_load.sh
%attr(0644,root,root) %{metron_home}/lib/metron-data-management-%{full_version}.jar
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
new file mode 100644
index 0000000..85e7520
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/cli/OptionHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils.cli;
+
+import com.google.common.base.Function;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+import java.util.Optional;
+
+public abstract class OptionHandler<OPT_T extends Enum<OPT_T>> implements Function<String, Option>
+{
+ public Optional<Object> getValue(OPT_T option, CommandLine cli) {
+ return Optional.empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/README.md b/metron-platform/metron-data-management/README.md
index 26dd472..eaafda4 100644
--- a/metron-platform/metron-data-management/README.md
+++ b/metron-platform/metron-data-management/README.md
@@ -206,32 +206,16 @@ The parameters for the utility are as follows:
| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. |
-### Bulk Load from HDFS
-
-The shell script `$METRON_HOME/bin/threatintel_bulk_load.sh` will kick off a MR job to load data staged in HDFS into an HBase table. Note: despite what
-the naming may suggest, this utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
-
-The parameters for the utility are as follows:
-
-| Short Code | Long Code | Is Required? | Description |
-|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------|
-| -h | | No | Generate the help screen/set of options |
-| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source |
-| -t | --table | Yes | The HBase table to import into |
-| -f | --column_family | Yes | The HBase table column family to import into |
-| -i | --input | Yes | The input data location on HDFS |
-| -n | --enrichment_config | No | The JSON document describing the enrichments to configure. Unlike other loaders, this is run first if specified. |
-or threat intel.
### Flatfile Loader
-The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk and load the enrichment or threat intel data into an HBase table.
+The shell script `$METRON_HOME/bin/flatfile_loader.sh` will read data from local disk, HDFS or URLs and load the enrichment or threat intel data into an HBase table.
Note: This utility works for enrichment as well as threat intel due to the underlying infrastructure being the same.
One special thing to note here is that there is a special configuration
parameter to the Extractor config that is only considered during this
loader:
-* inputFormatHandler : This specifies how to consider the data. The two implementations are `BY_LINE` and `org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat`.
+* inputFormat : This specifies how to consider the data. The two implementations are `BY_LINE` and `WHOLE_FILE`.
The default is `BY_LINE`, which makes sense for a list of CSVs where
each line indicates a unit of information which can be imported.
@@ -243,7 +227,9 @@ The parameters for the utility are as follows:
| Short Code | Long Code | Is Required? | Description | |
|------------|---------------------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
| -h | | No | Generate the help screen/set of options | |
+| -q | --quiet | No | Do not update progress
| -e | --extractor_config | Yes | JSON Document describing the extractor for this input data source | |
+| -m | --import_mode | No | The Import mode to use: LOCAL, MR. Default: LOCAL | |
| -t | --hbase_table | Yes | The HBase table to import into | |
| -c | --hbase_cf | Yes | The HBase table column family to import into | |
| -i | --input | Yes | The input data location on local disk. If this is a file, then that file will be loaded. If this is a directory, then the files will be loaded recursively under that directory. | |
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
deleted file mode 100644
index 5ba0a91..0000000
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ThreatIntelBulkLoader.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.bulk;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.io.Files;
-import org.apache.commons.cli.*;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
-import org.apache.metron.common.configuration.enrichment.SensorEnrichmentUpdateConfig;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.common.utils.JSONUtils;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.text.*;
-import java.util.Date;
-
-public class ThreatIntelBulkLoader {
- private static abstract class OptionHandler implements Function<String, Option> {}
- public enum BulkLoadOptions {
- HELP("h", new OptionHandler() {
-
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- return new Option(s, "help", false, "Generate Help screen");
- }
- })
- ,TABLE("t", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "table", true, "HBase table to import data into");
- o.setRequired(true);
- o.setArgName("HBASE_TABLE");
- return o;
- }
- })
- ,COLUMN_FAMILY("f", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "column_family", true, "Column family of the HBase table to import into");
- o.setRequired(true);
- o.setArgName("CF_NAME");
- return o;
- }
- })
- ,EXTRACTOR_CONFIG("e", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
- o.setArgName("JSON_FILE");
- o.setRequired(true);
- return o;
- }
- })
- ,INPUT_DATA("i", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "input", true, "Input directory in HDFS for the data to import into HBase");
- o.setArgName("DIR");
- o.setRequired(true);
- return o;
- }
- })
- ,AS_OF_TIME("a", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "as_of", true, "The last read timestamp to mark the records with (omit for time of execution)");
- o.setArgName("datetime");
- o.setRequired(false);
- return o;
- }
- })
- ,AS_OF_TIME_FORMAT("z", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "as_of_format", true, "The format of the as_of time (only used in conjunction with the as_of option)");
- o.setArgName("format");
- o.setRequired(false);
- return o;
- }
- })
- ,CONVERTER("c", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "converter", true, "The HBase converter class to use (Default is threat intel)");
- o.setArgName("class");
- o.setRequired(false);
- return o;
- }
- })
- ,ENRICHMENT_CONFIG("n", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "enrichment_config", true
- , "JSON Document describing the enrichment configuration details." +
- " This is used to associate an enrichment type with a field type in zookeeper."
- );
- o.setArgName("JSON_FILE");
- o.setRequired(false);
- return o;
- }
- })
- ;
- Option option;
- String shortCode;
- BulkLoadOptions(String shortCode, OptionHandler optionHandler) {
- this.shortCode = shortCode;
- this.option = optionHandler.apply(shortCode);
- }
-
- public boolean has(CommandLine cli) {
- return cli.hasOption(shortCode);
- }
-
- public String get(CommandLine cli) {
- return cli.getOptionValue(shortCode);
- }
-
- public static CommandLine parse(CommandLineParser parser, String[] args) {
- try {
- CommandLine cli = parser.parse(getOptions(), args);
- if(ThreatIntelBulkLoader.BulkLoadOptions.HELP.has(cli)) {
- printHelp();
- System.exit(0);
- }
- return cli;
- } catch (ParseException e) {
- System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
- e.printStackTrace(System.err);
- printHelp();
- System.exit(-1);
- return null;
- }
- }
-
- public static void printHelp() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "ThreatIntelBulkLoader", getOptions());
- }
-
- public static Options getOptions() {
- Options ret = new Options();
- for(BulkLoadOptions o : BulkLoadOptions.values()) {
- ret.addOption(o.option);
- }
- return ret;
- }
- }
-
- private static long getTimestamp(CommandLine cli) throws java.text.ParseException {
- if(BulkLoadOptions.AS_OF_TIME.has(cli)) {
- if(!BulkLoadOptions.AS_OF_TIME_FORMAT.has(cli)) {
- throw new IllegalStateException("Unable to proceed: Specified as_of_time without an associated format.");
- }
- else {
- DateFormat format = new SimpleDateFormat(BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli));
- Date d = format.parse(BulkLoadOptions.AS_OF_TIME.get(cli));
- return d.getTime();
- }
- }
- else {
- return System.currentTimeMillis();
- }
- }
- private static String readExtractorConfig(File configFile) throws IOException {
- return Joiner.on("\n").join(Files.readLines(configFile, Charset.defaultCharset()));
- }
-
- public static Job createJob(Configuration conf, String input, String table, String cf, String extractorConfigContents, long ts, HbaseConverter converter) throws IOException {
- Job job = new Job(conf);
- job.setJobName("ThreatIntelBulkLoader: " + input + " => " + table + ":" + cf);
- System.out.println("Configuring " + job.getJobName());
- job.setJarByClass(ThreatIntelBulkLoader.class);
- job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
- job.setOutputFormatClass(TableOutputFormat.class);
- job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
- job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
- job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
- job.getConfiguration().set(BulkLoadMapper.LAST_SEEN_KEY, "" + ts);
- job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, converter.getClass().getName());
- job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(Put.class);
- job.setNumReduceTasks(0);
- ExtractorHandler handler = ExtractorHandler.load(extractorConfigContents);
- handler.getInputFormatHandler().set(job, new Path(input), handler.getConfig());
- return job;
- }
-
- public static void main(String... argv) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
- CommandLine cli = BulkLoadOptions.parse(new PosixParser(), otherArgs);
- Long ts = getTimestamp(cli);
- String input = BulkLoadOptions.INPUT_DATA.get(cli);
- String table = BulkLoadOptions.TABLE.get(cli);
- String cf = BulkLoadOptions.COLUMN_FAMILY.get(cli);
- String extractorConfigContents = readExtractorConfig(new File(BulkLoadOptions.EXTRACTOR_CONFIG.get(cli)));
- String converterClass = EnrichmentConverter.class.getName();
- if(BulkLoadOptions.CONVERTER.has(cli)) {
- converterClass = BulkLoadOptions.CONVERTER.get(cli);
- }
- SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null;
- if(BulkLoadOptions.ENRICHMENT_CONFIG.has(cli)) {
- sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(BulkLoadOptions.ENRICHMENT_CONFIG.get(cli))
- , SensorEnrichmentUpdateConfig.class
- );
- }
-
- HbaseConverter converter = (HbaseConverter) Class.forName(converterClass).getConstructor().newInstance();
- Job job = createJob(conf, input, table, cf, extractorConfigContents, ts, converter);
- System.out.println(conf);
- boolean jobRet = job.waitForCompletion(true);
- if(!jobRet) {
- System.exit(1);
- }
- if(sensorEnrichmentUpdateConfig != null) {
- sensorEnrichmentUpdateConfig.updateSensorConfigs();
- }
- System.exit(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
index 89477d8..2e2f799 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/ExtractorHandler.java
@@ -33,7 +33,7 @@ public class ExtractorHandler {
final static ObjectMapper _mapper = new ObjectMapper();
private Map<String, Object> config;
private Extractor extractor;
- private InputFormatHandler inputFormatHandler = Formats.BY_LINE;
+ private InputFormatHandler inputFormat = Formats.BY_LINE;
public Map<String, Object> getConfig() {
return config;
@@ -43,13 +43,13 @@ public class ExtractorHandler {
this.config = config;
}
- public InputFormatHandler getInputFormatHandler() {
- return inputFormatHandler;
+ public InputFormatHandler getInputFormat() {
+ return inputFormat;
}
- public void setInputFormatHandler(String handler) {
+ public void setInputFormat(String handler) {
try {
- this.inputFormatHandler= Formats.create(handler);
+ this.inputFormat= Formats.create(handler);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new IllegalStateException("Unable to create an inputformathandler", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
index b8be233..961e7d3 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/Formats.java
@@ -23,34 +23,34 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
-public enum Formats implements InputFormatHandler{
- BY_LINE(new InputFormatHandler() {
- @Override
- public void set(Job job, Path input, Map<String, Object> config) throws IOException {
+public enum Formats implements InputFormatHandler {
+ BY_LINE( (job, inputs, config) -> {
+ for(Path input : inputs) {
+ FileInputFormat.addInputPath(job, input);
+ }
+ }),
+ WHOLE_FILE( new WholeFileFormat());
+ InputFormatHandler _handler = null;
+ Formats(InputFormatHandler handler) {
+ this._handler = handler;
+ }
+ @Override
+ public void set(Job job, List<Path> path, Map<String, Object> config) throws IOException {
+ _handler.set(job, path, config);
+ }
- FileInputFormat.addInputPath(job, input);
- }
- })
- ;
- InputFormatHandler _handler = null;
- Formats(InputFormatHandler handler) {
- this._handler = handler;
+ public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
+ try {
+ InputFormatHandler ec = Formats.valueOf(handlerName)._handler;
+ return ec;
}
- @Override
- public void set(Job job, Path path, Map<String, Object> config) throws IOException {
- _handler.set(job, path, config);
- }
-
- public static InputFormatHandler create(String handlerName) throws ClassNotFoundException, IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
- try {
- InputFormatHandler ec = Formats.valueOf(handlerName);
- return ec;
- }
- catch(IllegalArgumentException iae) {
- InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).getConstructor().newInstance();
- return ex;
- }
+ catch(IllegalArgumentException iae) {
+ InputFormatHandler ex = (InputFormatHandler) Class.forName(handlerName).getConstructor().newInstance();
+ return ex;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
index 2287969..00e89c0 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/InputFormatHandler.java
@@ -17,12 +17,17 @@
*/
package org.apache.metron.dataloads.extractor.inputformat;
+import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
public interface InputFormatHandler {
- void set(Job job, Path input, Map<String, Object> config) throws IOException;
+ void set(Job job, List<Path> input, Map<String, Object> config) throws IOException;
+ default void set(Job job, Path input, Map<String, Object> config) throws IOException {
+ set(job, ImmutableList.of(input), config);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
index e0a58ef..5dc8b53 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/inputformat/WholeFileFormat.java
@@ -30,80 +30,83 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
public class WholeFileFormat implements InputFormatHandler {
- public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
- private FileSplit fileSplit;
- private Configuration conf;
- private Text value = new Text();
- private boolean processed = false;
+ public static class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
+ private FileSplit fileSplit;
+ private Configuration conf;
+ private Text value = new Text();
+ private boolean processed = false;
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- this.fileSplit = (FileSplit) split;
- this.conf = context.getConfiguration();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!processed) {
- byte[] contents = new byte[(int) fileSplit.getLength()];
- Path file = fileSplit.getPath();
- FileSystem fs = file.getFileSystem(conf);
- FSDataInputStream in = null;
- try {
- in = fs.open(file);
- IOUtils.readFully(in, contents, 0, contents.length);
- value.set(contents, 0, contents.length);
- } finally {
- IOUtils.closeStream(in);
- }
- processed = true;
- return true;
- }
- return false;
- }
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.fileSplit = (FileSplit) split;
+ this.conf = context.getConfiguration();
+ }
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return NullWritable.get();
- }
- @Override
- public Text getCurrentValue() throws IOException, InterruptedException{
- return value;
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!processed) {
+ byte[] contents = new byte[(int) fileSplit.getLength()];
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(file);
+ IOUtils.readFully(in, contents, 0, contents.length);
+ value.set(contents, 0, contents.length);
+ } finally {
+ IOUtils.closeStream(in);
}
+ processed = true;
+ return true;
+ }
+ return false;
+ }
- @Override
- public float getProgress() throws IOException {
- return processed ? 1.0f : 0.0f;
- }
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException{
+ return value;
+ }
- @Override
- public void close() throws IOException{
- //do nothing :)
- }
+ @Override
+ public float getProgress() throws IOException {
+ return processed ? 1.0f : 0.0f;
}
- public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
+ @Override
+ public void close() throws IOException{
+ //do nothing :)
+ }
+ }
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- return false;
- }
+ public static class WholeFileInputFormat extends FileInputFormat<NullWritable, Text> {
- @Override
- public RecordReader<NullWritable, Text> createRecordReader(
- InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- WholeFileRecordReader reader = new WholeFileRecordReader();
- reader.initialize(split, context);
- return reader;
- }
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ return false;
}
+
@Override
- public void set(Job job, Path input, Map<String, Object> config) throws IOException {
- WholeFileInputFormat.setInputPaths(job, input);
- job.setInputFormatClass(WholeFileInputFormat.class);
+ public RecordReader<NullWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ WholeFileRecordReader reader = new WholeFileRecordReader();
+ reader.initialize(split, context);
+ return reader;
+ }
+ }
+ @Override
+ public void set(Job job, List<Path> inputs, Map<String, Object> config) throws IOException {
+ for(Path input : inputs) {
+ WholeFileInputFormat.addInputPath(job, input);
}
+ job.setInputFormatClass(WholeFileInputFormat.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
index e44eb27..168d251 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/ExtractorState.java
@@ -17,19 +17,29 @@
*/
package org.apache.metron.dataloads.nonbulk.flatfile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.enrichment.converter.HbaseConverter;
+import java.io.IOException;
+
public class ExtractorState {
private HTableInterface table;
private Extractor extractor;
private HbaseConverter converter;
+ private FileSystem fs;
- public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter) {
+ public ExtractorState(HTableInterface table, Extractor extractor, HbaseConverter converter, Configuration config) {
this.table = table;
this.extractor = extractor;
this.converter = converter;
+ try {
+ this.fs = FileSystem.get(config);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve hadoop file system: " + e.getMessage(), e);
+ }
}
public HTableInterface getTable() {
@@ -43,4 +53,8 @@ public class ExtractorState {
public HbaseConverter getConverter() {
return converter;
}
+
+ public FileSystem getFileSystem() {
+ return fs;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
new file mode 100644
index 0000000..ddaf6a6
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/LoadOptions.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import org.apache.commons.cli.*;
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.cli.OptionHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public enum LoadOptions {
+ HELP("h", new OptionHandler<LoadOptions>() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,QUIET("q", new OptionHandler<LoadOptions>() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "quiet", false, "Do not update progress");
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ return Optional.of(option.has(cli));
+ }
+ })
+ , IMPORT_MODE("m", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "import_mode", true
+ , "The Import mode to use: " + Joiner.on(",").join(ImportStrategy.values())
+ + ". Default: " + ImportStrategy.LOCAL
+ );
+ o.setArgName("MODE");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ String mode = option.get(cli);
+ return Optional.of(ImportStrategy.getStrategy(mode).orElse(ImportStrategy.LOCAL));
+ }
+ })
+ ,HBASE_TABLE("t", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "hbase_table", true, "HBase table to ingest the data into.");
+ o.setArgName("TABLE");
+ o.setRequired(true);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ return Optional.ofNullable(option.get(cli).trim());
+ }
+ })
+ ,HBASE_CF("c", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "hbase_cf", true, "HBase column family to ingest the data into.");
+ o.setArgName("CF");
+ o.setRequired(true);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ return Optional.ofNullable(option.get(cli).trim());
+ }
+ })
+ ,EXTRACTOR_CONFIG("e", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
+ o.setArgName("JSON_FILE");
+ o.setRequired(true);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ try {
+ return Optional.ofNullable(FileUtils.readFileToString(new File(option.get(cli).trim())));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve extractor config from " + option.get(cli) + ": " + e.getMessage(), e);
+ }
+ }
+ })
+ ,ENRICHMENT_CONFIG("n", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "enrichment_config", true
+ , "JSON Document describing the enrichment configuration details." +
+ " This is used to associate an enrichment type with a field type in zookeeper."
+ );
+ o.setArgName("JSON_FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,LOG4J_PROPERTIES("l", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "log4j", true, "The log4j properties file to load");
+ o.setArgName("FILE");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ,NUM_THREADS("p", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine.");
+ o.setArgName("NUM_THREADS");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ int numThreads = Runtime.getRuntime().availableProcessors();
+ if(option.has(cli)) {
+ numThreads = ConversionUtils.convert(option.get(cli), Integer.class);
+ }
+ return Optional.of(numThreads);
+ }
+ })
+ ,BATCH_SIZE("b", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
+ o.setArgName("SIZE");
+ o.setRequired(false);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ int batchSize = 128;
+ if(option.has(cli)) {
+ batchSize = ConversionUtils.convert(option.get(cli), Integer.class);
+ }
+ return Optional.of(batchSize);
+ }
+ })
+ ,INPUT("i", new OptionHandler<LoadOptions>() {
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "input", true, "The CSV File to load");
+ o.setArgName("FILE");
+ o.setRequired(true);
+ return o;
+ }
+
+ @Override
+ public Optional<Object> getValue(LoadOptions option, CommandLine cli) {
+ List<String> inputs = new ArrayList<>();
+ for(String input : Splitter.on(",").split(Optional.ofNullable(option.get(cli)).orElse(""))) {
+ inputs.add(input.trim());
+ }
+ return Optional.of(inputs);
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+ OptionHandler<LoadOptions> handler;
+ LoadOptions(String shortCode, OptionHandler<LoadOptions> optionHandler) {
+ this.shortCode = shortCode;
+ this.handler = optionHandler;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if(HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static EnumMap<LoadOptions, Optional<Object> > createConfig(CommandLine cli) {
+ EnumMap<LoadOptions, Optional<Object> > ret = new EnumMap<>(LoadOptions.class);
+ for(LoadOptions option : values()) {
+ ret.put(option, option.handler.getValue(option, cli));
+ }
+ return ret;
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for(LoadOptions o : LoadOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
index 9992422..8ee11aa 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoader.java
@@ -17,312 +17,48 @@
*/
package org.apache.metron.dataloads.nonbulk.flatfile;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import org.apache.commons.cli.*;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.file.ReaderSpliterator;
-import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentUpdateConfig;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.ImportStrategy;
import org.apache.metron.common.utils.JSONUtils;
-import javax.annotation.Nullable;
import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Stack;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.stream.Stream;
+import java.util.*;
public class SimpleEnrichmentFlatFileLoader {
- private static abstract class OptionHandler implements Function<String, Option> {}
- public static enum LoadOptions {
- HELP("h", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- return new Option(s, "help", false, "Generate Help screen");
- }
- })
- ,HBASE_TABLE("t", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "hbase_table", true, "HBase table to ingest the data into.");
- o.setArgName("TABLE");
- o.setRequired(true);
- return o;
- }
- })
- ,HBASE_CF("c", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "hbase_cf", true, "HBase column family to ingest the data into.");
- o.setArgName("CF");
- o.setRequired(true);
- return o;
- }
- })
- ,EXTRACTOR_CONFIG("e", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "extractor_config", true, "JSON Document describing the extractor for this input data source");
- o.setArgName("JSON_FILE");
- o.setRequired(true);
- return o;
- }
- })
- ,ENRICHMENT_CONFIG("n", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "enrichment_config", true
- , "JSON Document describing the enrichment configuration details." +
- " This is used to associate an enrichment type with a field type in zookeeper."
- );
- o.setArgName("JSON_FILE");
- o.setRequired(false);
- return o;
- }
- })
- ,LOG4J_PROPERTIES("l", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "log4j", true, "The log4j properties file to load");
- o.setArgName("FILE");
- o.setRequired(false);
- return o;
- }
- })
- ,NUM_THREADS("p", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "threads", true, "The number of threads to use when extracting data. The default is the number of cores of your machine.");
- o.setArgName("NUM_THREADS");
- o.setRequired(false);
- return o;
- }
- })
- ,BATCH_SIZE("b", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "batchSize", true, "The batch size to use for HBase puts");
- o.setArgName("SIZE");
- o.setRequired(false);
- return o;
- }
- })
- ,INPUT("i", new OptionHandler() {
- @Nullable
- @Override
- public Option apply(@Nullable String s) {
- Option o = new Option(s, "input", true, "The CSV File to load");
- o.setArgName("FILE");
- o.setRequired(true);
- return o;
- }
- })
- ;
- Option option;
- String shortCode;
- LoadOptions(String shortCode, OptionHandler optionHandler) {
- this.shortCode = shortCode;
- this.option = optionHandler.apply(shortCode);
- }
-
- public boolean has(CommandLine cli) {
- return cli.hasOption(shortCode);
- }
-
- public String get(CommandLine cli) {
- return cli.getOptionValue(shortCode);
- }
-
- public static CommandLine parse(CommandLineParser parser, String[] args) {
- try {
- CommandLine cli = parser.parse(getOptions(), args);
- if(HELP.has(cli)) {
- printHelp();
- System.exit(0);
- }
- return cli;
- } catch (ParseException e) {
- System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
- e.printStackTrace(System.err);
- printHelp();
- System.exit(-1);
- return null;
- }
- }
-
- public static void printHelp() {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "SimpleEnrichmentFlatFileLoader", getOptions());
- }
-
- public static Options getOptions() {
- Options ret = new Options();
- for(LoadOptions o : LoadOptions.values()) {
- ret.addOption(o.option);
- }
- return ret;
- }
- }
- public static List<File> getFiles(File root) {
- if(!root.isDirectory()) {
- return ImmutableList.of(root);
- }
- List<File> ret = new ArrayList<>();
- Stack<File> stack = new Stack<File>();
- stack.push(root);
- while(!stack.isEmpty()) {
- File f = stack.pop();
- if(f.isDirectory()) {
- for(File child : f.listFiles()) {
- stack.push(child);
- }
- }
- else {
- ret.add(f);
- }
- }
- return ret;
- }
- public HTableProvider getProvider() {
- return new HTableProvider();
- }
-
- public List<Put> extract( String line
- , Extractor extractor
- , String cf
- , HbaseConverter converter
- ) throws IOException
- {
- List<Put> ret = new ArrayList<>();
- Iterable<LookupKV> kvs = extractor.extract(line);
- for(LookupKV kv : kvs) {
- Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
- ret.add(put);
- }
- return ret;
- }
-
- public void load( final Iterable<Stream<String>> streams
- , final ThreadLocal<ExtractorState> state
- , final String cf
- , int numThreads
- )
- {
- for(Stream<String> stream : streams) {
- try {
- ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
- forkJoinPool.submit(() ->
- stream.parallel().forEach(input -> {
- ExtractorState es = state.get();
- try {
- es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter()));
- } catch (IOException e) {
- throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
- }
- }
- )
- ).get();
- } catch (InterruptedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- } catch (ExecutionException e) {
- throw new IllegalStateException(e.getMessage(), e);
- } finally {
- stream.close();
- }
- }
- }
-
- private static Iterable<Stream<String>> streamify(List<File> files, int batchSize, boolean lineByLine) throws FileNotFoundException {
- List<Stream<String>> ret = new ArrayList<>();
- if(!lineByLine) {
- ret.add(files.stream().map(f -> {
- try {
- return FileUtils.readFileToString(f);
- } catch (IOException e) {
- throw new IllegalStateException("File " + f.getName() + " not found.");
- }
- }));
- }
- else {
- for(File f : files) {
- ret.add(ReaderSpliterator.lineStream(new BufferedReader(new FileReader(f)), batchSize));
- }
- }
- return ret;
+ public static void main(String... argv) throws Exception {
+ Configuration hadoopConfig = HBaseConfiguration.create();
+ String[] otherArgs = new GenericOptionsParser(hadoopConfig, argv).getRemainingArgs();
+ main(hadoopConfig, otherArgs);
}
- public static void main(String... argv) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+ public static void main(Configuration hadoopConfig, String[] argv) throws Exception {
- CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
+ CommandLine cli = LoadOptions.parse(new PosixParser(), argv);
+ EnumMap<LoadOptions, Optional<Object>> config = LoadOptions.createConfig(cli);
if(LoadOptions.LOG4J_PROPERTIES.has(cli)) {
PropertyConfigurator.configure(LoadOptions.LOG4J_PROPERTIES.get(cli));
}
ExtractorHandler handler = ExtractorHandler.load(
- FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
+ FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli).trim()))
);
- int batchSize = 128;
- if(LoadOptions.BATCH_SIZE.has(cli)) {
- batchSize = ConversionUtils.convert(LoadOptions.BATCH_SIZE.get(cli), Integer.class);
- }
- int numThreads = Runtime.getRuntime().availableProcessors();
- if(LoadOptions.NUM_THREADS.has(cli)) {
- numThreads = ConversionUtils.convert(LoadOptions.NUM_THREADS.get(cli), Integer.class);
- }
- boolean lineByLine = !handler.getInputFormatHandler().getClass().equals(WholeFileFormat.class);
+ ImportStrategy strategy = (ImportStrategy) config.get(LoadOptions.IMPORT_MODE).get();
+ strategy.getImporter().importData(config, handler, hadoopConfig);
+
SensorEnrichmentUpdateConfig sensorEnrichmentUpdateConfig = null;
if(LoadOptions.ENRICHMENT_CONFIG.has(cli)) {
sensorEnrichmentUpdateConfig = JSONUtils.INSTANCE.load( new File(LoadOptions.ENRICHMENT_CONFIG.get(cli))
, SensorEnrichmentUpdateConfig.class
);
}
- List<File> inputFiles = getFiles(new File(LoadOptions.INPUT.get(cli)));
- SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
- ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
- @Override
- protected ExtractorState initialValue() {
- try {
- ExtractorHandler handler = ExtractorHandler.load(
- FileUtils.readFileToString(new File(LoadOptions.EXTRACTOR_CONFIG.get(cli)))
- );
- HTableInterface table = loader.getProvider().getTable(conf, LoadOptions.HBASE_TABLE.get(cli));
- return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter());
- } catch (IOException e1) {
- throw new IllegalStateException("Unable to get table: " + e1);
- }
- }
- };
-
- loader.load(streamify(inputFiles, batchSize, lineByLine), state, LoadOptions.HBASE_CF.get(cli), numThreads);
if(sensorEnrichmentUpdateConfig != null) {
sensorEnrichmentUpdateConfig.updateSensorConfigs();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
new file mode 100644
index 0000000..df88640
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/ImportStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import java.util.Optional;
+
+public enum ImportStrategy {
+ LOCAL(LocalImporter.INSTANCE),
+ MR(MapReduceImporter.INSTANCE)
+ ;
+ private Importer importer;
+
+ ImportStrategy(Importer importer) {
+ this.importer = importer;
+ }
+
+ public Importer getImporter() {
+ return importer;
+ }
+
+ public static Optional<ImportStrategy> getStrategy(String strategyName) {
+ if(strategyName == null) {
+ return Optional.empty();
+ }
+ for(ImportStrategy strategy : values()) {
+ if(strategy.name().equalsIgnoreCase(strategyName.trim())) {
+ return Optional.of(strategy);
+ }
+ }
+ return Optional.empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
new file mode 100644
index 0000000..81ede08
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/Importer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+
+public interface Importer {
+ void importData(EnumMap<LoadOptions, Optional<Object>> config, ExtractorHandler handler , final Configuration hadoopConfig) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
new file mode 100644
index 0000000..652a4c3
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/LocalImporter.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.common.utils.file.ReaderSpliterator;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
+import org.apache.metron.dataloads.nonbulk.flatfile.ExtractorState;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.Location;
+import org.apache.metron.dataloads.nonbulk.flatfile.location.LocationStrategy;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.HbaseConverter;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.hbase.HTableProvider;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public enum LocalImporter implements Importer {
+ INSTANCE;
+
+ public interface HTableProviderRetriever {
+ HTableProvider retrieve();
+ }
+
+
+ @Override
+ public void importData( final EnumMap<LoadOptions, Optional<Object>> config
+ , final ExtractorHandler handler
+ , final Configuration hadoopConfig
+ ) throws IOException {
+ importData(config, handler, hadoopConfig, () -> new HTableProvider());
+
+ }
+ public void importData( final EnumMap<LoadOptions, Optional<Object>> config
+ , final ExtractorHandler handler
+ , final Configuration hadoopConfig
+ , final HTableProviderRetriever provider
+ ) throws IOException {
+ ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
+ @Override
+ protected ExtractorState initialValue() {
+ try {
+ HTableInterface table = provider.retrieve().getTable(hadoopConfig, (String) config.get(LoadOptions.HBASE_TABLE).get());
+ return new ExtractorState(table, handler.getExtractor(), new EnrichmentConverter(), hadoopConfig);
+ } catch (IOException e1) {
+ throw new IllegalStateException("Unable to get table: " + e1);
+ }
+ }
+ };
+ boolean quiet = (boolean) config.get(LoadOptions.QUIET).get();
+ boolean lineByLine = !handler.getInputFormat().getClass().equals(WholeFileFormat.class);
+ List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
+ String cf = (String) config.get(LoadOptions.HBASE_CF).get();
+ if(!lineByLine) {
+ extractWholeFiles(inputs, state, cf, quiet);
+ }
+ else {
+ int batchSize = (int) config.get(LoadOptions.BATCH_SIZE).get();
+ int numThreads = (int) config.get(LoadOptions.NUM_THREADS).get();
+ extractLineByLine(inputs, state, cf, batchSize, numThreads, quiet);
+ }
+
+ }
+
+ public void extractLineByLine( List<String> inputs
+ , ThreadLocal<ExtractorState> state
+ , String cf
+ , int batchSize
+ , int numThreads
+ , boolean quiet
+ ) throws IOException {
+ inputs.stream().map(input -> LocationStrategy.getLocation(input, state.get().getFileSystem()))
+ .forEach( loc -> {
+ final Progress progress = new Progress();
+ if(!quiet) {
+ System.out.println("\nProcessing " + loc.toString());
+ }
+ try (Stream<String> stream = ReaderSpliterator.lineStream(loc.openReader(), batchSize)) {
+ ForkJoinPool forkJoinPool = new ForkJoinPool(numThreads);
+ forkJoinPool.submit(() ->
+ stream.parallel().forEach(input -> {
+ ExtractorState es = state.get();
+ try {
+ es.getTable().put(extract(input, es.getExtractor(), cf, es.getConverter(), progress, quiet));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to continue: " + e.getMessage(), e);
+ }
+ }
+ )
+ ).get();
+ } catch (Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ );
+ }
+
+ public void extractWholeFiles( List<String> inputs, ThreadLocal<ExtractorState> state, String cf, boolean quiet) throws IOException {
+ final Progress progress = new Progress();
+ final List<Location> locations = new ArrayList<>();
+ Location.fileVisitor(inputs, loc -> locations.add(loc), state.get().getFileSystem());
+ locations.parallelStream().forEach(loc -> {
+ try(BufferedReader br = loc.openReader()) {
+ String s = br.lines().collect(Collectors.joining());
+ state.get().getTable().put(extract( s
+ , state.get().getExtractor()
+ , cf, state.get().getConverter()
+ , progress
+ , quiet
+ )
+ );
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to read " + loc + ": " + e.getMessage(), e);
+ }
+ });
+ }
+
+
+ public List<Put> extract(String line
+ , Extractor extractor
+ , String cf
+ , HbaseConverter converter
+ , final Progress progress
+ , final boolean quiet
+ ) throws IOException
+ {
+ List<Put> ret = new ArrayList<>();
+ Iterable<LookupKV> kvs = extractor.extract(line);
+ for(LookupKV kv : kvs) {
+ Put put = converter.toPut(cf, kv.getKey(), kv.getValue());
+ ret.add(put);
+ }
+ if(!quiet) {
+ progress.update();
+ }
+ return ret;
+ }
+
+
+ public static class Progress {
+ private int count = 0;
+ private String anim= "|/-\\";
+
+ public synchronized void update() {
+ int currentCount = count++;
+ System.out.print("\rProcessed " + currentCount + " - " + anim.charAt(currentCount % anim.length()));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
new file mode 100644
index 0000000..e83bdd6
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.importer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.Logger;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.BulkLoadMapper;
+import org.apache.metron.dataloads.nonbulk.flatfile.LoadOptions;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+
+import java.io.IOException;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+public enum MapReduceImporter implements Importer{
+ INSTANCE
+ ;
+
+ private static final Logger LOG = Logger.getLogger(MapReduceImporter.class);
+
+ @Override
+ public void importData(EnumMap<LoadOptions, Optional<Object>> config
+ , ExtractorHandler handler
+ , Configuration hadoopConfig
+ ) throws IOException {
+ String table = (String) config.get(LoadOptions.HBASE_TABLE).get();
+ String cf = (String) config.get(LoadOptions.HBASE_CF).get();
+ String extractorConfigContents = (String) config.get(LoadOptions.EXTRACTOR_CONFIG).get();
+ Job job = Job.getInstance(hadoopConfig);
+ List<String> inputs = (List<String>) config.get(LoadOptions.INPUT).get();
+ job.setJobName("MapReduceImporter: " + inputs.stream().collect(Collectors.joining(",")) + " => " + table + ":" + cf);
+ LOG.info("Configuring " + job.getJobName());
+ job.setJarByClass(MapReduceImporter.class);
+ job.setMapperClass(org.apache.metron.dataloads.hbase.mr.BulkLoadMapper.class);
+ job.setOutputFormatClass(TableOutputFormat.class);
+ job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
+ job.getConfiguration().set(BulkLoadMapper.COLUMN_FAMILY_KEY, cf);
+ job.getConfiguration().set(BulkLoadMapper.CONFIG_KEY, extractorConfigContents);
+ job.getConfiguration().set(BulkLoadMapper.CONVERTER_KEY, EnrichmentConverter.class.getName());
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setNumReduceTasks(0);
+ List<Path> paths = inputs.stream().map(p -> new Path(p)).collect(Collectors.toList());
+ handler.getInputFormat().set(job, paths, handler.getConfig());
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to complete job: " + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java
new file mode 100644
index 0000000..267a6fb
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/FileLocation.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public class FileLocation implements RawLocation {
+ @Override
+ public Optional<List<String>> list(String loc) {
+ List<String> children = new ArrayList<>();
+ for(File f : new File(loc).listFiles()) {
+ children.add(f.getPath());
+ }
+ return Optional.of(children);
+ }
+
+ @Override
+ public boolean exists(String loc) throws IOException {
+ return new File(loc).exists();
+ }
+
+ @Override
+ public boolean isDirectory(String loc) throws IOException {
+ return new File(loc).isDirectory();
+ }
+
+ @Override
+ public InputStream openInputStream(String loc) throws IOException {
+ return new FileInputStream(loc);
+ }
+
+ @Override
+ public boolean match(String loc) {
+ return new File(loc).exists();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java
new file mode 100644
index 0000000..bae6a82
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/HDFSLocation.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class HDFSLocation implements RawLocation<FileSystem> {
+
+ FileSystem fs = null;
+
+ @Override
+ public Optional<List<String>> list(String loc) throws IOException {
+ List<String> children = new ArrayList<>();
+ for(FileStatus f : fs.listStatus(new Path(loc)) ) {
+ children.add(f.getPath().toString());
+ }
+ return Optional.of(children);
+ }
+
+ @Override
+ public boolean exists(String loc) throws IOException {
+ return fs.exists(new Path(loc));
+ }
+
+ @Override
+ public boolean isDirectory(String loc) throws IOException {
+ return fs.isDirectory(new Path(loc));
+ }
+
+ @Override
+ public InputStream openInputStream(String loc) throws IOException {
+ return fs.open(new Path(loc));
+ }
+
+ @Override
+ public boolean match(String loc) {
+ try {
+ return loc.startsWith("hdfs://") && exists(loc);
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void init(FileSystem state) {
+ this.fs = state;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java
new file mode 100644
index 0000000..81eada6
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/Location.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import org.apache.hadoop.fs.*;
+import org.apache.metron.dataloads.nonbulk.flatfile.importer.LocalImporter;
+
+import java.io.*;
+import java.util.*;
+import java.util.function.Consumer;
+
+/**
+ * Location can be either a local file or a file on HDFS.
+ */
+public class Location {
+
+ private String loc;
+ private RawLocation<?> rawLocation;
+
+ public Location(String loc, RawLocation rawLocation) {
+ this.loc = loc;
+ this.rawLocation = rawLocation;
+
+ }
+
+ public RawLocation<?> getRawLocation() {
+ return rawLocation;
+ }
+
+ public Optional<List<Location>> getChildren() throws IOException {
+ if(exists() && isDirectory()) {
+ List<Location> children = new ArrayList<>();
+ for(String child : rawLocation.list(loc).orElse(new ArrayList<>())) {
+ children.add(new Location(child, rawLocation));
+ }
+ return Optional.of(children);
+ }
+ else {
+ return Optional.empty();
+ }
+ }
+
+
+ public boolean exists() throws IOException {
+ return rawLocation.exists(loc);
+ }
+
+ public boolean isDirectory() throws IOException {
+ return rawLocation.isDirectory(loc);
+ }
+
+ public BufferedReader openReader() throws IOException {
+ return rawLocation.openReader(loc);
+ }
+
+ @Override
+ public String toString() {
+ return loc;
+ }
+
+ public static void fileVisitor(List<String> inputs
+ , final Consumer<Location> importConsumer
+ , final FileSystem fs
+ ) throws IOException {
+ Stack<Location> stack = new Stack<>();
+ for(String input : inputs) {
+ Location loc = LocationStrategy.getLocation(input, fs);
+ if(loc.exists()) {
+ stack.add(loc);
+ }
+ }
+ while(!stack.empty()) {
+ Location loc = stack.pop();
+ if(loc.isDirectory()) {
+ for(Location child : loc.getChildren().orElse(Collections.emptyList())) {
+ stack.push(child);
+ }
+ }
+ else {
+ importConsumer.accept(loc);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java
new file mode 100644
index 0000000..338a1e2
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/LocationStrategy.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+public enum LocationStrategy {
+ HDFS(fs -> {
+ HDFSLocation location = new HDFSLocation();
+ location.init(fs);
+ return location;
+ })
+ ,FILE(fs -> {
+ FileLocation location = new FileLocation();
+ location.init(fs);
+ return location;
+ })
+ ,URL(fs -> {
+ URLLocation location = new URLLocation();
+ location.init(fs);
+ return location;
+ })
+ ;
+ Function<FileSystem, RawLocation<?>> locationCreator;
+
+ LocationStrategy(Function<FileSystem, RawLocation<?>> locationCreator) {
+ this.locationCreator = locationCreator;
+ }
+
+ public static Optional<RawLocation<?>> getRawLocation(String loc, FileSystem fs) {
+ for(LocationStrategy strategy : values()) {
+ RawLocation<?> location = strategy.locationCreator.apply(fs);
+ if(location.match(loc)) {
+ return Optional.of(location);
+ }
+ }
+ return Optional.empty();
+ }
+
+ public static Location getLocation(String loc, FileSystem fs) {
+ Optional<RawLocation<?>> rawLoc = getRawLocation(loc, fs);
+ if(rawLoc.isPresent()) {
+ return new Location(loc, rawLoc.get());
+ }
+ else {
+ throw new IllegalStateException("Unsupported type: " + loc);
+ }
+ }
+}