You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/06 20:17:40 UTC

[12/17] incubator-metron git commit: METRON-682: Unify and Improve the Flat File Loader closes apache/incubator-metron#432

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java
new file mode 100644
index 0000000..5f2db33
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/RawLocation.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import java.io.*;
+import java.util.List;
+import java.util.Optional;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public interface RawLocation<T> {
+  Optional<List<String>> list(String loc) throws IOException;
+  boolean exists(String loc) throws IOException;
+  boolean isDirectory(String loc) throws IOException;
+
+  InputStream openInputStream(String loc) throws IOException;
+  boolean match(String loc);
+  default void init(T state) {
+
+  }
+
+  default BufferedReader openReader(String loc) throws IOException {
+    InputStream is = openInputStream(loc);
+    if(loc.endsWith(".gz")) {
+      return new BufferedReader(new InputStreamReader(new GZIPInputStream(is)));
+    }
+    else if(loc.endsWith(".zip")) {
+      ZipInputStream zis = new ZipInputStream(is);
+      ZipEntry entry = zis.getNextEntry();
+      if(entry != null) {
+        return new BufferedReader(new InputStreamReader(zis));
+      }
+      else {
+        return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(new byte[] {})));
+      }
+    }
+    else {
+      return new BufferedReader(new InputStreamReader(is));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java
new file mode 100644
index 0000000..cc8edbe
--- /dev/null
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/location/URLLocation.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile.location;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipInputStream;
+
+public class URLLocation implements RawLocation {
+
+  @Override
+  public Optional<List<String>> list(String loc) throws IOException {
+    return Optional.of(Collections.emptyList());
+  }
+
+  @Override
+  public boolean exists(String loc) throws IOException {
+    return true;
+  }
+
+  @Override
+  public boolean isDirectory(String loc) throws IOException {
+    return false;
+  }
+
+  @Override
+  public InputStream openInputStream(String loc) throws IOException {
+    return new URL(loc).openConnection().getInputStream();
+  }
+
+  @Override
+  public boolean match(String loc) {
+    try {
+      new URL(loc);
+      return true;
+    } catch (MalformedURLException e) {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh b/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
index bba7f8e..b9e2746 100755
--- a/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
+++ b/metron-platform/metron-data-management/src/main/scripts/flatfile_loader.sh
@@ -27,9 +27,25 @@ elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
   . /usr/lib/bigtop-utils/bigtop-detect-javahome
 fi
 
-export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
 export METRON_VERSION=${project.version}
 export METRON_HOME=/usr/metron/$METRON_VERSION
+export CLASSNAME="org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader"
 export DM_JAR=${project.artifactId}-$METRON_VERSION.jar
-CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
-java -cp $CP org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader "$@"
+export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
+
+if [ $(which hadoop) ]
+then
+  HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
+  for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
+    if [ -f $jar ];then
+      LIBJARS="$jar,$LIBJARS"
+    fi
+  done
+  export HADOOP_CLASSPATH
+  hadoop jar $METRON_HOME/lib/$DM_JAR $CLASSNAME -libjars ${LIBJARS} "$@"
+else
+  echo "Warning: Metron cannot find the hadoop client on this node.  This means that loading via Map Reduce will NOT function."
+  CP=$METRON_HOME/lib/$DM_JAR:/usr/metron/${METRON_VERSION}/lib/taxii-1.1.0.1.jar:`${HBASE_HOME}/bin/hbase classpath`
+  java -cp $CP $CLASSNAME "$@"
+fi
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh b/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh
deleted file mode 100755
index 865d0ad..0000000
--- a/metron-platform/metron-data-management/src/main/scripts/threatintel_bulk_load.sh
+++ /dev/null
@@ -1,41 +0,0 @@
-#!/bin/bash
-# 
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-# 
-#     http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# 
-
-BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
-[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
-
-# Autodetect JAVA_HOME if not defined
-if [ -e /usr/libexec/bigtop-detect-javahome ]; then
-  . /usr/libexec/bigtop-detect-javahome
-elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
-  . /usr/lib/bigtop-utils/bigtop-detect-javahome
-fi
-
-export HBASE_HOME=${HBASE_HOME:-/usr/hdp/current/hbase-client}
-HADOOP_CLASSPATH=${HBASE_HOME}/lib/hbase-server.jar:`${HBASE_HOME}/bin/hbase classpath`
-for jar in $(echo $HADOOP_CLASSPATH | sed 's/:/ /g');do
-  if [ -f $jar ];then
-    LIBJARS="$jar,$LIBJARS"
-  fi
-done
-export HADOOP_CLASSPATH
-export METRON_VERSION=${project.version}
-export METRON_HOME=/usr/metron/$METRON_VERSION
-export DM_JAR=${project.artifactId}-$METRON_VERSION.jar
-hadoop jar $METRON_HOME/lib/$DM_JAR org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader -libjars ${LIBJARS} "$@"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
deleted file mode 100644
index b7a753b..0000000
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/BulkLoadMapperIntegrationTest.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.hbase.mr;
-
-import com.sun.jersey.guice.spi.container.GuiceComponentProviderFactory;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.PosixParser;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Level;
-
-public class BulkLoadMapperIntegrationTest {
-  /** The test util. */
-  private HBaseTestingUtility testUtil;
-
-  /** The test table. */
-  private HTable testTable;
-  private String tableName = "malicious_domains";
-  private String cf = "cf";
-  private String csvFile="input.csv";
-  private String extractorJson = "extractor.json";
-  private String enrichmentJson = "enrichment_config.json";
-  private String asOf = "04/15/2016";
-  private String asOfFormat = "georgia";
-  private String convertClass = "threadIntel.class";
-  private Configuration config = null;
-
-
-  @Before
-  public void setup() throws Exception {
-    UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
-    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
-    config = kv.getValue();
-    testUtil = kv.getKey();
-    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
-  }
-
-  @After
-  public void teardown() throws Exception {
-    HBaseUtil.INSTANCE.teardown(testUtil);
-  }
- /**
-         {
-            "config" : {
-                        "columns" : {
-                                "host" : 0
-                                ,"meta" : 2
-                                    }
-                       ,"indicator_column" : "host"
-                       ,"separator" : ","
-                       ,"type" : "threat"
-                       }
-            ,"extractor" : "CSV"
-         }
-         */
-  @Multiline
-  private static String extractorConfig;
-
-  @Test
-  public void testCommandLine() throws Exception {
-    UnitTestHelper.setJavaLoggingLevel(GuiceComponentProviderFactory.class, Level.WARNING);
-    Configuration conf = HBaseConfiguration.create();
-
-    String[] argv = {"-f cf", "-t malicious_domains", "-e extractor.json", "-n enrichment_config.json", "-a 04/15/2016", "-i input.csv", "-z georgia", "-c threadIntel.class"};
-    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
-    CommandLine cli = ThreatIntelBulkLoader.BulkLoadOptions.parse(new PosixParser(), otherArgs);
-    Assert.assertEquals(extractorJson,ThreatIntelBulkLoader.BulkLoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
-    Assert.assertEquals(cf, ThreatIntelBulkLoader.BulkLoadOptions.COLUMN_FAMILY.get(cli).trim());
-    Assert.assertEquals(tableName,ThreatIntelBulkLoader.BulkLoadOptions.TABLE.get(cli).trim());
-    Assert.assertEquals(enrichmentJson,ThreatIntelBulkLoader.BulkLoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
-    Assert.assertEquals(csvFile,ThreatIntelBulkLoader.BulkLoadOptions.INPUT_DATA.get(cli).trim());
-    Assert.assertEquals(asOf, ThreatIntelBulkLoader.BulkLoadOptions.AS_OF_TIME.get(cli).trim());
-    Assert.assertEquals(asOfFormat, ThreatIntelBulkLoader.BulkLoadOptions.AS_OF_TIME_FORMAT.get(cli).trim());
-    Assert.assertEquals(convertClass, ThreatIntelBulkLoader.BulkLoadOptions.CONVERTER.get(cli).trim());
-  }
-
-  @Test
-  public void test() throws IOException, ClassNotFoundException, InterruptedException {
-
-    Assert.assertNotNull(testTable);
-    FileSystem fs = FileSystem.get(config);
-    String contents = "google.com,1,foo";
-    EnrichmentConverter converter = new EnrichmentConverter();
-    HBaseUtil.INSTANCE.writeFile(contents, new Path("input.csv"), fs);
-    Job job = ThreatIntelBulkLoader.createJob(config, "input.csv", tableName, cf, extractorConfig, 0L, new EnrichmentConverter());
-    Assert.assertTrue(job.waitForCompletion(true));
-    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
-    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
-    for(Result r : scanner) {
-      results.add(converter.fromResult(r, cf));
-    }
-    Assert.assertEquals(1, results.size());
-    Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
-    Assert.assertEquals(results.get(0).getKey().type, "threat");
-    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
-    Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
-    Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
index 14a5143..d82be9d 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/mr/LeastRecentlyUsedPrunerIntegrationTest.java
@@ -36,10 +36,7 @@ import org.apache.metron.enrichment.lookup.LookupKey;
 import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker;
 import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker;
 import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -49,21 +46,21 @@ import java.util.logging.Level;
 
 public class LeastRecentlyUsedPrunerIntegrationTest {
     /** The test util. */
-    private HBaseTestingUtility testUtil;
+    private static HBaseTestingUtility testUtil;
 
     /** The test table. */
-    private HTable testTable;
-    private HTable atTable;
-    private String tableName = "malicious_domains";
-    private String cf = "cf";
-    private String atTableName = "access_trackers";
-    private String atCF= "cf";
-    private String beginTime = "04/14/2016 12:00:00";
-    private String timeFormat = "georgia";
-    private Configuration config = null;
+    private static HTable testTable;
+    private static HTable atTable;
+    private static final String tableName = "malicious_domains";
+    private static final String cf = "cf";
+    private static final String atTableName = "access_trackers";
+    private static final String atCF= "cf";
+    private static final String beginTime = "04/14/2016 12:00:00";
+    private static final String timeFormat = "georgia";
+    private static Configuration config = null;
 
-    @Before
-    public void setup() throws Exception {
+    @BeforeClass
+    public static void setup() throws Exception {
         UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
         Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
         config = kv.getValue();
@@ -71,10 +68,12 @@ public class LeastRecentlyUsedPrunerIntegrationTest {
         testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
         atTable = testUtil.createTable(Bytes.toBytes(atTableName), Bytes.toBytes(atCF));
     }
-    @After
-    public void teardown() throws Exception {
+
+    @AfterClass
+    public static void teardown() throws Exception {
         HBaseUtil.INSTANCE.teardown(testUtil);
     }
+
     public List<LookupKey> getKeys(int start, int end) {
         List<LookupKey> keys = new ArrayList<>();
         for(int i = start;i < end;++i) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
new file mode 100644
index 0000000..d0d637d
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.nonbulk.flatfile;
+
+import com.google.common.collect.ImmutableList;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.dataloads.extractor.Extractor;
+import org.apache.metron.dataloads.extractor.ExtractorHandler;
+import org.apache.metron.dataloads.hbase.mr.HBaseUtil;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.junit.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.stream.Stream;
+import java.util.zip.GZIPOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+import java.util.zip.ZipOutputStream;
+
+public class SimpleEnrichmentFlatFileLoaderIntegrationTest {
+
+  private static HBaseTestingUtility testUtil;
+
+  /** The test table. */
+  private static HTable testTable;
+  private static Configuration config = null;
+  private static final String tableName = "enrichment";
+  private static final String cf = "cf";
+  private static final String csvFile="input.csv";
+  private static final String extractorJson = "extractor.json";
+  private static final String enrichmentJson = "enrichment_config.json";
+  private static final String log4jProperty = "log4j";
+  private static final File file1 = new File("target/sefflt_data_1.csv");
+  private static final File file2 = new File("target/sefflt_data_2.csv");
+  private static final File multilineFile= new File("target/sefflt_data_2.csv");
+  private static final File multilineZipFile= new File("target/sefflt_data_2.csv.zip");
+  private static final File multilineGzFile= new File("target/sefflt_data_2.csv.gz");
+  private static final File lineByLineExtractorConfigFile = new File("target/sefflt_extractorConfig_lbl.json");
+  private static final File wholeFileExtractorConfigFile = new File("target/sefflt_extractorConfig_wf.json");
+  private static final int NUM_LINES = 1000;
+
+  /**
+   {
+      "config" : {
+        "columns" : {
+          "host" : 0,
+          "meta" : 2
+                    },
+        "indicator_column" : "host",
+        "separator" : ",",
+        "type" : "enrichment"
+                 },
+      "extractor" : "CSV"
+   }
+   */
+  @Multiline
+  private static String lineByLineExtractorConfig;
+
+  /**
+   {
+      "config" : {
+        "columns" : {
+          "host" : 0,
+          "meta" : 2
+                    },
+        "indicator_column" : "host",
+        "separator" : ",",
+        "type" : "enrichment"
+                 },
+      "extractor" : "CSV",
+      "inputFormat" : "WHOLE_FILE"
+   }
+   */
+  @Multiline
+  private static String wholeFileExtractorConfig;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
+    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
+    config = kv.getValue();
+    testUtil = kv.getKey();
+    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
+
+    for(Result r : testTable.getScanner(Bytes.toBytes(cf))) {
+      Delete d = new Delete(r.getRow());
+      testTable.delete(d);
+    }
+
+    if(lineByLineExtractorConfigFile.exists()) {
+      lineByLineExtractorConfigFile.delete();
+    }
+    Files.write( lineByLineExtractorConfigFile.toPath()
+               , lineByLineExtractorConfig.getBytes()
+               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+    );
+    if(wholeFileExtractorConfigFile.exists()) {
+      wholeFileExtractorConfigFile.delete();
+    }
+    Files.write( wholeFileExtractorConfigFile.toPath()
+               , wholeFileExtractorConfig.getBytes()
+               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+    );
+    if(file1.exists()) {
+      file1.delete();
+    }
+    Files.write( file1.toPath()
+               , "google1.com,1,foo2\n".getBytes()
+               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+    );
+    if(file2.exists()) {
+      file2.delete();
+    }
+    Files.write( file2.toPath()
+               , "google2.com,2,foo2\n".getBytes()
+               , StandardOpenOption.CREATE_NEW , StandardOpenOption.TRUNCATE_EXISTING
+    );
+
+    if(multilineFile.exists()) {
+      multilineFile.delete();
+    }
+    if(multilineGzFile.exists()) {
+      multilineGzFile.delete();
+    }
+    if(multilineGzFile.exists()) {
+      multilineZipFile.delete();
+    }
+    PrintWriter[] pws =new PrintWriter[] {};
+    try {
+      ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(multilineZipFile));
+      ZipEntry entry = new ZipEntry("file");
+      zos.putNextEntry(entry);
+       pws = new PrintWriter[]{
+         new PrintWriter(multilineFile),
+         new PrintWriter(zos),
+         new PrintWriter(new GZIPOutputStream(new FileOutputStream(multilineGzFile)))
+                              };
+      for(int i = 0;i < NUM_LINES;++i) {
+        for(PrintWriter pw : pws) {
+          pw.println("google" + i + ".com," + i + ",foo" + i);
+        }
+      }
+    }
+    finally {
+      for(PrintWriter pw : pws) {
+        pw.close();
+      }
+    }
+
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    HBaseUtil.INSTANCE.teardown(testUtil);
+    file1.delete();
+    file2.delete();
+    multilineFile.delete();
+    multilineGzFile.delete();
+    multilineZipFile.delete();
+    lineByLineExtractorConfigFile.delete();
+    wholeFileExtractorConfigFile.delete();
+  }
+
+
+  @Test
+  public void testArgs() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e extractor.json", "-n enrichment_config.json"
+            , "-l log4j", "-i input.csv"
+            , "-p 2", "-b 128", "-q"
+    };
+
+    String[] otherArgs = new GenericOptionsParser(config, argv).getRemainingArgs();
+
+    CommandLine cli = LoadOptions.parse(new PosixParser(), otherArgs);
+    Assert.assertEquals(extractorJson, LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
+    Assert.assertEquals(cf, LoadOptions.HBASE_CF.get(cli).trim());
+    Assert.assertEquals(tableName, LoadOptions.HBASE_TABLE.get(cli).trim());
+    Assert.assertEquals(enrichmentJson, LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
+    Assert.assertEquals(csvFile, LoadOptions.INPUT.get(cli).trim());
+    Assert.assertEquals(log4jProperty, LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
+    Assert.assertEquals("2", LoadOptions.NUM_THREADS.get(cli).trim());
+    Assert.assertEquals("128", LoadOptions.BATCH_SIZE.get(cli).trim());
+  }
+
+  @Test
+  public void testLocalLineByLine() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e " + lineByLineExtractorConfigFile.getPath()
+            , "-i " + multilineFile.getPath()
+            , "-p 2", "-b 128", "-q"
+    };
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for (Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(NUM_LINES, results.size());
+    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+
+  }
+
+  @Test
+  public void testLocalLineByLine_gz() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e " + lineByLineExtractorConfigFile.getPath()
+            , "-i " + multilineGzFile.getPath()
+            , "-p 2", "-b 128", "-q"
+    };
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for (Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(NUM_LINES, results.size());
+    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+
+  }
+
+  @Test
+  public void testLocalLineByLine_zip() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e " + lineByLineExtractorConfigFile.getPath()
+            , "-i " + multilineZipFile.getPath()
+            , "-p 2", "-b 128", "-q"
+    };
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for (Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(NUM_LINES, results.size());
+    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+
+  }
+
+  @Test
+  public void testLocalWholeFile() throws Exception {
+    String[] argv = { "-c cf", "-t enrichment"
+            , "-e " + wholeFileExtractorConfigFile.getPath()
+            , "-i " + file1.getPath() + "," + file2.getPath()
+            , "-p 2", "-b 128", "-q"
+    };
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for(Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(2, results.size());
+    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith( "google"));
+
+  }
+
+  @Test
+  public void testMRLineByLine() throws Exception {
+    String[] argv = {"-c cf", "-t enrichment"
+            , "-e " + lineByLineExtractorConfigFile.getPath()
+            , "-i " + multilineFile.getName()
+            , "-m MR"
+            , "-p 2", "-b 128", "-q"
+    };
+    FileSystem fs = FileSystem.get(config);
+    HBaseUtil.INSTANCE.writeFile(new String(Files.readAllBytes(multilineFile.toPath())), new Path(multilineFile.getName()), fs);
+    SimpleEnrichmentFlatFileLoader.main(config, argv);
+    EnrichmentConverter converter = new EnrichmentConverter();
+    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
+    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
+    for (Result r : scanner) {
+      results.add(converter.fromResult(r, cf));
+      testTable.delete(new Delete(r.getRow()));
+    }
+    Assert.assertEquals(NUM_LINES, results.size());
+    Assert.assertTrue(results.get(0).getKey().indicator.startsWith("google"));
+    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
+    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("meta").toString().startsWith("foo"));
+    Assert.assertTrue(results.get(0).getValue().getMetadata().get("host").toString().startsWith("google"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
deleted file mode 100644
index 4ffb91a..0000000
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.nonbulk.flatfile;
-
-import com.google.common.collect.ImmutableList;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.dataloads.bulk.ThreatIntelBulkLoader;
-import org.apache.metron.dataloads.extractor.Extractor;
-import org.apache.metron.dataloads.extractor.ExtractorHandler;
-import org.apache.metron.dataloads.extractor.inputformat.WholeFileFormat;
-import org.apache.metron.dataloads.nonbulk.flatfile.SimpleEnrichmentFlatFileLoader;
-import org.apache.metron.dataloads.hbase.mr.HBaseUtil;
-import org.apache.metron.enrichment.converter.HbaseConverter;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.common.utils.JSONUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-
-public class SimpleEnrichmentFlatFileLoaderTest {
-
-  private HBaseTestingUtility testUtil;
-
-  /** The test table. */
-  private HTable testTable;
-  private String tableName = "enrichment";
-  private String cf = "cf";
-  private String csvFile="input.csv";
-  private String extractorJson = "extractor.json";
-  private String enrichmentJson = "enrichment_config.json";
-  private String log4jProperty = "log4j";
-
-  Configuration config = null;
-  /**
-   {
-      "config" : {
-        "columns" : {
-          "host" : 0,
-          "meta" : 2
-                    },
-        "indicator_column" : "host",
-        "separator" : ",",
-        "type" : "enrichment"
-                 },
-      "extractor" : "CSV"
-   }
-   */
-  @Multiline
-  private static String extractorConfig;
-
-  @Before
-  public void setup() throws Exception {
-    Map.Entry<HBaseTestingUtility, Configuration> kv = HBaseUtil.INSTANCE.create(true);
-    config = kv.getValue();
-    testUtil = kv.getKey();
-    testTable = testUtil.createTable(Bytes.toBytes(tableName), Bytes.toBytes(cf));
-  }
-
-  @After
-  public void teardown() throws Exception {
-    HBaseUtil.INSTANCE.teardown(testUtil);
-  }
-
-  @Test
-  public void testCommandLine() throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-
-    String[] argv = { "-c cf", "-t enrichment"
-            , "-e extractor.json", "-n enrichment_config.json"
-            , "-l log4j", "-i input.csv"
-            , "-p 2", "-b 128"
-    };
-    String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
-
-    CommandLine cli = SimpleEnrichmentFlatFileLoader.LoadOptions.parse(new PosixParser(), otherArgs);
-    Assert.assertEquals(extractorJson,SimpleEnrichmentFlatFileLoader.LoadOptions.EXTRACTOR_CONFIG.get(cli).trim());
-    Assert.assertEquals(cf, SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_CF.get(cli).trim());
-    Assert.assertEquals(tableName,SimpleEnrichmentFlatFileLoader.LoadOptions.HBASE_TABLE.get(cli).trim());
-    Assert.assertEquals(enrichmentJson,SimpleEnrichmentFlatFileLoader.LoadOptions.ENRICHMENT_CONFIG.get(cli).trim());
-    Assert.assertEquals(csvFile,SimpleEnrichmentFlatFileLoader.LoadOptions.INPUT.get(cli).trim());
-    Assert.assertEquals(log4jProperty, SimpleEnrichmentFlatFileLoader.LoadOptions.LOG4J_PROPERTIES.get(cli).trim());
-    Assert.assertEquals("2", SimpleEnrichmentFlatFileLoader.LoadOptions.NUM_THREADS.get(cli).trim());
-    Assert.assertEquals("128", SimpleEnrichmentFlatFileLoader.LoadOptions.BATCH_SIZE.get(cli).trim());
-  }
-
-  @Test
-  public void test() throws Exception {
-
-    Assert.assertNotNull(testTable);
-    String contents = "google.com,1,foo";
-
-    EnrichmentConverter converter = new EnrichmentConverter();
-    ExtractorHandler handler = ExtractorHandler.load(extractorConfig);
-    Extractor e = handler.getExtractor();
-    SimpleEnrichmentFlatFileLoader loader = new SimpleEnrichmentFlatFileLoader();
-    Stream<String> contentStreams = ImmutableList.of(contents).stream();
-    ThreadLocal<ExtractorState> state = new ThreadLocal<ExtractorState>() {
-      @Override
-      protected ExtractorState initialValue() {
-        return new ExtractorState(testTable, e, converter);
-      }
-    };
-    loader.load(ImmutableList.of(contentStreams)
-               , state
-               , cf
-               , 2
-               );
-
-    ResultScanner scanner = testTable.getScanner(Bytes.toBytes(cf));
-    List<LookupKV<EnrichmentKey, EnrichmentValue>> results = new ArrayList<>();
-    for(Result r : scanner) {
-      results.add(converter.fromResult(r, cf));
-    }
-    Assert.assertEquals(1, results.size());
-    Assert.assertEquals(results.get(0).getKey().indicator, "google.com");
-    Assert.assertEquals(results.get(0).getKey().type, "enrichment");
-    Assert.assertEquals(results.get(0).getValue().getMetadata().size(), 2);
-    Assert.assertEquals(results.get(0).getValue().getMetadata().get("meta"), "foo");
-    Assert.assertEquals(results.get(0).getValue().getMetadata().get("host"), "google.com");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
index 1cb58d8..0223514 100644
--- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
@@ -33,10 +33,7 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.test.mock.MockHTable;
 import org.apache.metron.enrichment.lookup.LookupKV;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -44,13 +41,13 @@ import java.util.Set;
 
 public class TaxiiIntegrationTest {
 
-    @Before
-    public void setup() throws IOException {
+    @BeforeClass
+    public static void setup() throws IOException {
         MockTaxiiService.start(8282);
     }
 
-    @After
-    public void teardown() {
+    @AfterClass
+    public static void teardown() {
         MockTaxiiService.shutdown();
         MockHTable.Provider.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/1be4fcb0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index a93c442..ae04e43 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -175,8 +175,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest {
             .withComponent("config", configUploadComponent)
             .withComponent("storm", fluxComponent)
             .withComponent("search", getSearchComponent(topologyProperties))
-            .withMillisecondsBetweenAttempts(15000)
-            .withNumRetries(10)
+            .withMillisecondsBetweenAttempts(1500)
+            .withNumRetries(100)
             .withMaxTimeMS(150000)
             .withCustomShutdownOrder(new String[] {"search","storm","config","kafka","zk"})
             .build();