You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/02/25 17:25:06 UTC

[2/6] incubator-metron git commit: METRON-50: Ingest threat intel data from Taxii feeds closes apache/incubator-metron#29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
index 8a82828..9db6398 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
@@ -19,10 +19,8 @@ package org.apache.metron.threatintel;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.reference.lookup.Lookup;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
 import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
 import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
 import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
@@ -32,7 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Map;
 import java.util.UUID;
 
 public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
index 2c80a8d..e2a1037 100644
--- a/metron-streaming/Metron-Indexing/pom.xml
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -24,7 +24,6 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <elastic.search.version>1.3.1</elastic.search.version>
         <http.client.version>4.3.4</http.client.version>
         <jsonsimple.version>1.1.1</jsonsimple.version>
     </properties>
@@ -74,7 +73,7 @@
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
-            <version>${elastic.search.version}</version>
+            <version>${global_elasticsearch_version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
@@ -93,11 +92,6 @@
             <version>1.9</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>3.8.2</version>
-        </dependency>
 
     </dependencies>
     <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/pom.xml b/metron-streaming/Metron-Testing/pom.xml
new file mode 100644
index 0000000..d68d81d
--- /dev/null
+++ b/metron-streaming/Metron-Testing/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+  Foundation (ASF) under one or more contributor license agreements. See the 
+  NOTICE file distributed with this work for additional information regarding 
+  copyright ownership. The ASF licenses this file to You under the Apache License, 
+  Version 2.0 (the "License"); you may not use this file except in compliance 
+  with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 
+  Unless required by applicable law or agreed to in writing, software distributed 
+  under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+  OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+  the specific language governing permissions and limitations under the License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.metron</groupId>
+    <artifactId>Metron-Streaming</artifactId>
+    <version>0.1BETA</version>
+  </parent>
+  <artifactId>Metron-Testing</artifactId>
+  <description>Metron Testing Utilities</description>
+  <properties>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${global_junit_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>${global_elasticsearch_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>flux-core</artifactId>
+      <version>${global_flux_version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${global_storm_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${global_hbase_guava_version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${global_hbase_version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>log4j</artifactId>
+          <groupId>log4j</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${global_hadoop_version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+  </build>
+  <reporting>
+  </reporting>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java
new file mode 100644
index 0000000..7af2212
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.Set;
+import java.util.Stack;
+
+public class UnitTestHelper {
+    public static String findDir(String name) {
+        return findDir(new File("."), name);
+    }
+
+    public static String findDir(File startDir, String name) {
+        Stack<File> s = new Stack<File>();
+        s.push(startDir);
+        while(!s.empty()) {
+            File parent = s.pop();
+            if(parent.getName().equalsIgnoreCase(name)) {
+                return parent.getAbsolutePath();
+            }
+            else {
+                File[] children = parent.listFiles();
+                if(children != null) {
+                    for (File child : children) {
+                        s.push(child);
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public static <T> void assertSetEqual(String type, Set<T> expectedPcapIds, Set<T> found) {
+        boolean mismatch = false;
+        for(T f : found) {
+            if(!expectedPcapIds.contains(f)) {
+                mismatch = true;
+                System.out.println("Found " + type + " that I did not expect: " + f);
+            }
+        }
+        for(T expectedId : expectedPcapIds) {
+            if(!found.contains(expectedId)) {
+                mismatch = true;
+                System.out.println("Expected " + type + " that I did not index: " + expectedId);
+            }
+        }
+        Assert.assertFalse(mismatch);
+    }
+
+    public static void verboseLogging() {
+        verboseLogging("%d [%p|%c|%C{1}] %m%n", Level.ALL);
+    }
+    public static void verboseLogging(String pattern, Level level) {
+        ConsoleAppender console = new ConsoleAppender(); //create appender
+        //configure the appender
+        console.setLayout(new PatternLayout(pattern));
+        console.setThreshold(level);
+        console.activateOptions();
+        //add appender to any Logger (here is root)
+        Logger.getRootLogger().addAppender(console);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
new file mode 100644
index 0000000..3e5e793
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class ComponentRunner {
+    public static class Builder {
+        LinkedHashMap<String, InMemoryComponent> components;
+        String[] startupOrder;
+        String[] shutdownOrder;
+        public Builder() {
+            components = new LinkedHashMap<String, InMemoryComponent>();
+        }
+
+        public Builder withComponent(String name, InMemoryComponent component) {
+            components.put(name, component);
+            return this;
+        }
+
+        public Builder withCustomStartupOrder(String[] startupOrder) {
+            this.startupOrder = startupOrder;
+            return this;
+        }
+        public Builder withCustomShutdownOrder(String[] shutdownOrder) {
+            this.shutdownOrder = shutdownOrder;
+            return this;
+        }
+        private static String[] toOrderedList(Map<String, InMemoryComponent> components) {
+            String[] ret = new String[components.size()];
+            int i = 0;
+            for(String component : components.keySet()) {
+                ret[i++] = component;
+            }
+            return ret;
+        }
+        public ComponentRunner build() {
+            if(shutdownOrder == null) {
+                shutdownOrder = toOrderedList(components);
+            }
+            if(startupOrder == null) {
+                startupOrder = toOrderedList(components);
+            }
+            return new ComponentRunner(components, startupOrder, shutdownOrder);
+        }
+
+    }
+
+    LinkedHashMap<String, InMemoryComponent> components;
+    String[] startupOrder;
+    String[] shutdownOrder;
+    public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
+                          , String[] startupOrder
+                          , String[] shutdownOrder
+                          )
+    {
+        this.components = components;
+        this.startupOrder = startupOrder;
+        this.shutdownOrder = shutdownOrder;
+
+    }
+
+    public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
+        return clazz.cast(getComponents().get(name));
+    }
+
+    public LinkedHashMap<String, InMemoryComponent> getComponents() {
+        return components;
+    }
+
+    public void start() throws UnableToStartException {
+        for(String componentName : startupOrder) {
+            components.get(componentName).start();
+        }
+    }
+    public void stop() {
+        for(String componentName : shutdownOrder) {
+            components.get(componentName).stop();
+        }
+    }
+
+    public <T> T process(Processor<T> successState) {
+        return process(successState, 5, 30000, 120000);
+    }
+
+    public <T> T process(Processor<T> successState, int numRetries, long timeBetweenAttempts, long maxTimeMs) {
+        int retryCount = 0;
+        long start = System.currentTimeMillis();
+        while(true) {
+            long duration = System.currentTimeMillis() - start;
+            if(duration > maxTimeMs) {
+                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs);
+            }
+            ReadinessState state = successState.process(this);
+            if(state == ReadinessState.READY) {
+                return successState.getResult();
+            }
+            else if(state == ReadinessState.NOT_READY) {
+                retryCount++;
+                if(numRetries > 0 && retryCount > numRetries) {
+                    throw new RuntimeException("Too many retries: " + retryCount);
+                }
+            }
+            try {
+                Thread.sleep(timeBetweenAttempts);
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Unable to sleep", e);
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
new file mode 100644
index 0000000..b5224a2
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public interface InMemoryComponent {
+    public void start() throws UnableToStartException;
+    public void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java
new file mode 100644
index 0000000..59f3ece
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public interface Processor<T> {
+    ReadinessState process(ComponentRunner runner);
+    T getResult();
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java
new file mode 100644
index 0000000..59ce69a
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public enum ReadinessState {
+    READY, NOT_READY;
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java
new file mode 100644
index 0000000..4e691f4
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public class UnableToStartException extends Exception {
+    public UnableToStartException(String message) {
+        super(message);
+    }
+    public UnableToStartException(String message, Throwable t) {
+        super(message, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
new file mode 100644
index 0000000..a7991c0
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.search.SearchHit;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticSearchComponent implements InMemoryComponent {
+
+    public static class Builder{
+        private int httpPort;
+        private File indexDir;
+        private Map<String, String> extraElasticSearchSettings = null;
+        public Builder withHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+            return this;
+        }
+        public Builder withIndexDir(File indexDir) {
+            this.indexDir = indexDir;
+            return this;
+        }
+        public Builder withExtraElasticSearchSettings(Map<String, String> extraElasticSearchSettings) {
+            this.extraElasticSearchSettings = extraElasticSearchSettings;
+            return this;
+        }
+        public ElasticSearchComponent build() {
+            return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings);
+        }
+    }
+
+    private Client client;
+    private Node node;
+    private int httpPort;
+    private File indexDir;
+    private Map<String, String> extraElasticSearchSettings;
+
+    public ElasticSearchComponent(int httpPort, File indexDir) {
+        this(httpPort, indexDir, null);
+    }
+    public ElasticSearchComponent(int httpPort, File indexDir, Map<String, String> extraElasticSearchSettings) {
+        this.httpPort = httpPort;
+        this.indexDir = indexDir;
+        this.extraElasticSearchSettings = extraElasticSearchSettings;
+    }
+    public Client getClient() {
+        return client;
+    }
+
+    private void cleanDir(File dir) throws IOException {
+        if(dir.exists()) {
+            FileUtils.deleteDirectory(dir);
+        }
+        dir.mkdirs();
+    }
+    public void start() throws UnableToStartException {
+        File logDir= new File(indexDir, "/logs");
+        File dataDir= new File(indexDir, "/data");
+        try {
+            cleanDir(logDir);
+            cleanDir(dataDir);
+
+        } catch (IOException e) {
+            throw new UnableToStartException("Unable to clean log or data directories", e);
+        }
+        ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder()
+                .put("node.http.enabled", true)
+                .put("http.port", httpPort)
+                .put("cluster.name", "metron")
+                .put("path.logs",logDir.getAbsolutePath())
+                .put("path.data",dataDir.getAbsolutePath())
+                .put("gateway.type", "none")
+                .put("index.store.type", "memory")
+                .put("index.number_of_shards", 1)
+                .put("node.mode", "network")
+                .put("index.number_of_replicas", 1);
+        if(extraElasticSearchSettings != null) {
+            immutableSettings = immutableSettings.put(extraElasticSearchSettings);
+        }
+        Settings settings = immutableSettings.build();
+
+        node = NodeBuilder.nodeBuilder().settings(settings).node();
+        node.start();
+        settings = ImmutableSettings.settingsBuilder()
+					.put("cluster.name", "metron").build();
+		client = new TransportClient(settings)
+					.addTransportAddress(new InetSocketTransportAddress("localhost",
+							9300));
+
+        waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000));
+    }
+
+    public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException {
+        try {
+            ClusterHealthResponse healthResponse =
+                    (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet();
+            if (healthResponse != null && healthResponse.isTimedOut()) {
+                throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name()
+                        + " and not " + status.name()
+                        + ", from here on, everything will fail!");
+            }
+        } catch (ElasticsearchTimeoutException e) {
+            throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
+        }
+    }
+
+    public List<Map<String, Object>> getAllIndexedDocs(String index) throws IOException {
+       return getAllIndexedDocs(index, "message");
+    }
+    public List<Map<String, Object>> getAllIndexedDocs(String index, String subMessage) throws IOException {
+        getClient().admin().indices().refresh(new RefreshRequest());
+        SearchResponse response = getClient().prepareSearch(index)
+                .setTypes("pcap_doc")
+                .setSource("message")
+                .setFrom(0)
+                .setSize(1000)
+                .execute().actionGet();
+        List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
+        for (SearchHit hit : response.getHits()) {
+            Object o = null;
+            if(subMessage == null) {
+                o = hit.getSource();
+            }
+            else {
+                o = hit.getSource().get(subMessage);
+            }
+            ret.add((Map<String, Object>)(o));
+        }
+        return ret;
+    }
+    public boolean hasIndex(String indexName) {
+        Set<String> indices = getClient().admin()
+                                    .indices()
+                                    .stats(new IndicesStatsRequest())
+                                    .actionGet()
+                                    .getIndices()
+                                    .keySet();
+        return indices.contains(indexName);
+
+    }
+
+    public void stop() {
+        node.stop();
+        node = null;
+        client = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
new file mode 100644
index 0000000..2cac4ee
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.apache.storm.flux.FluxBuilder;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
+import org.apache.thrift7.TException;
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+public class FluxTopologyComponent implements InMemoryComponent {
+    LocalCluster stormCluster;
+    String topologyName;
+    File topologyLocation;
+    Properties topologyProperties;
+
+    public static class Builder {
+        String topologyName;
+        File topologyLocation;
+        Properties topologyProperties;
+        public Builder withTopologyName(String name) {
+            this.topologyName = name;
+            return this;
+        }
+        public Builder withTopologyLocation(File location) {
+            this.topologyLocation = location;
+            return this;
+        }
+        public Builder withTopologyProperties(Properties properties) {
+            this.topologyProperties = properties;
+            return this;
+        }
+
+        public FluxTopologyComponent build() {
+            return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
+        }
+    }
+
+    public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
+        this.topologyName = topologyName;
+        this.topologyLocation = topologyLocation;
+        this.topologyProperties = topologyProperties;
+    }
+
+    public LocalCluster getStormCluster() {
+        return stormCluster;
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public File getTopologyLocation() {
+        return topologyLocation;
+    }
+
+    public Properties getTopologyProperties() {
+        return topologyProperties;
+    }
+
+    public void start() throws UnableToStartException{
+        try {
+            stormCluster = new LocalCluster();
+        } catch (Exception e) {
+            throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e);
+        }
+    }
+
+    public void stop() {
+        stormCluster.shutdown();
+    }
+
+    public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
+        startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
+    }
+    private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
+        TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
+        Config conf = FluxBuilder.buildConfig(topologyDef);
+        ExecutionContext context = new ExecutionContext(topologyDef, conf);
+        StormTopology topology = FluxBuilder.buildTopology(context);
+        Assert.assertNotNull(topology);
+        topology.validate();
+        stormCluster.submitTopology(topologyName, conf, topology);
+    }
+
+    private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
+        File tmpFile = File.createTempFile(topologyName, "props");
+        tmpFile.deleteOnExit();
+        FileWriter propWriter = null;
+        try {
+            propWriter = new FileWriter(tmpFile);
+            properties.store(propWriter, topologyName + " properties");
+        }
+        finally {
+            if(propWriter != null) {
+                propWriter.close();
+                return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+            }
+
+            return null;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java
new file mode 100644
index 0000000..76558d5
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.mock;
+
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * MockHTable.
+ *
+ * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217
+ */
+public class MockHTable implements HTableInterface {
+
+    public static class Provider implements Serializable {
+        private static Map<String, HTableInterface> _cache = new HashMap<>();
+        public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+            return _cache.get(tableName);
+        }
+        public static HTableInterface getFromCache(String tableName) {
+            return _cache.get(tableName);
+        }
+        public static HTableInterface addToCache(String tableName, String... columnFamilies) {
+            MockHTable ret =  new MockHTable(tableName, columnFamilies);
+            _cache.put(tableName, ret);
+            return ret;
+        }
+
+        public static void clear() {
+            _cache.clear();
+        }
+    }
+
+    private final String tableName;
+    private final List<String> columnFamilies = new ArrayList<>();
+    private HColumnDescriptor[] descriptors;
+
+    private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
+            = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+        return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
+    }
+
+    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
+        List<KeyValue> ret = new ArrayList<KeyValue>();
+        for (byte[] family : rowdata.keySet())
+            for (byte[] qualifier : rowdata.get(family).keySet()) {
+                int versionsAdded = 0;
+                for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
+                    if (versionsAdded++ == maxVersions)
+                        break;
+                    Long timestamp = tsToVal.getKey();
+                    if (timestamp < timestampStart)
+                        continue;
+                    if (timestamp > timestampEnd)
+                        continue;
+                    byte[] value = tsToVal.getValue();
+                    ret.add(new KeyValue(row, family, qualifier, timestamp, value));
+                }
+            }
+        return ret;
+    }
+    public MockHTable(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public MockHTable(String tableName, String... columnFamilies) {
+        this.tableName = tableName;
+        for(String cf : columnFamilies) {
+            addColumnFamily(cf);
+        }
+    }
+
+    public void addColumnFamily(String columnFamily) {
+        this.columnFamilies.add(columnFamily);
+        descriptors = new HColumnDescriptor[columnFamilies.size()];
+        int i = 0;
+        for(String cf : columnFamilies) {
+            descriptors[i++] = new HColumnDescriptor(cf);
+        }
+    }
+
+
+    @Override
+    public byte[] getTableName() {
+        return Bytes.toBytes(tableName);
+    }
+
+    @Override
+    public TableName getName() {
+        return TableName.valueOf(tableName);
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        HTableDescriptor ret = new HTableDescriptor(tableName);
+        for(HColumnDescriptor c : descriptors) {
+            ret.addFamily(c);
+        }
+        return ret;
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
+            return data.containsKey(get.getRow());
+        } else {
+            byte[] row = get.getRow();
+            if(!data.containsKey(row)) {
+                return false;
+            }
+            for(byte[] family : get.getFamilyMap().keySet()) {
+                if(!data.get(row).containsKey(family)) {
+                    return false;
+                } else {
+                    return true;
+                }
+            }
+            return true;
+        }
+    }
+
+    /**
+     * Test for the existence of columns in the table, as specified by the Gets.
+     * <p/>
+     * <p/>
+     * This will return an array of booleans. Each value will be true if the related Get matches
+     * one or more keys, false if not.
+     * <p/>
+     * <p/>
+     * This is a server-side call so it prevents any data from being transferred to
+     * the client.
+     *
+     * @param gets the Gets
+     * @return Array of boolean.  True if the specified Get matches one or more keys, false if not.
+     * @throws IOException e
+     */
+    @Override
+    public boolean[] existsAll(List<Get> gets) throws IOException {
+        boolean[] ret = new boolean[gets.size()];
+        int i = 0;
+        for(boolean b : exists(gets)) {
+           ret[i++] = b;
+        }
+        return ret;
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> list) throws IOException {
+        Boolean[] ret = new Boolean[list.size()];
+        int i = 0;
+        for(Get g : list) {
+           ret[i++] = exists(g);
+        }
+        return ret;
+    }
+
+    @Override
+    public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
+        throw new UnsupportedOperationException();
+
+    }
+
+    /**
+     * @param actions
+     * @deprecated
+     */
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+        List<Result> results = new ArrayList<Result>();
+        for (Row r : actions) {
+            if (r instanceof Delete) {
+                delete((Delete) r);
+                continue;
+            }
+            if (r instanceof Put) {
+                put((Put) r);
+                continue;
+            }
+            if (r instanceof Get) {
+                results.add(get((Get) r));
+            }
+        }
+        return results.toArray();
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
+        throw new UnsupportedOperationException();
+
+    }
+
+    /**
+     * @param list
+     * @param callback
+     * @deprecated
+     */
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        if (!data.containsKey(get.getRow()))
+            return new Result();
+        byte[] row = get.getRow();
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        if (!get.hasFamilies()) {
+            kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
+        } else {
+            for (byte[] family : get.getFamilyMap().keySet()){
+                if (data.get(row).get(family) == null)
+                    continue;
+                NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
+                if (qualifiers == null || qualifiers.isEmpty())
+                    qualifiers = data.get(row).get(family).navigableKeySet();
+                for (byte[] qualifier : qualifiers){
+                    if (qualifier == null)
+                        qualifier = "".getBytes();
+                    if (!data.get(row).containsKey(family) ||
+                            !data.get(row).get(family).containsKey(qualifier) ||
+                            data.get(row).get(family).get(qualifier).isEmpty())
+                        continue;
+                    Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
+                    kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
+                }
+            }
+        }
+        Filter filter = get.getFilter();
+        if (filter != null) {
+            filter.reset();
+            List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+            for (KeyValue kv : kvs) {
+                if (filter.filterAllRemaining()) {
+                    break;
+                }
+                if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+                    continue;
+                }
+                if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
+                    nkvs.add(kv);
+                }
+                // ignoring next key hint which is a optimization to reduce file system IO
+            }
+            if (filter.hasFilterRow()) {
+                filter.filterRow();
+            }
+            kvs = nkvs;
+        }
+
+        return new Result(kvs);
+    }
+
+    @Override
+    public Result[] get(List<Get> list) throws IOException {
+        Result[] ret = new Result[list.size()];
+        int i = 0;
+        for(Get g : list) {
+            ret[i++] = get(g);
+        }
+        return ret;
+    }
+
+    /**
+     * @param bytes
+     * @param bytes1
+     * @deprecated
+     */
+    @Override
+    public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        final List<Result> ret = new ArrayList<Result>();
+        byte[] st = scan.getStartRow();
+        byte[] sp = scan.getStopRow();
+        Filter filter = scan.getFilter();
+
+        for (byte[] row : data.keySet()){
+            // if row is equal to startRow emit it. When startRow (inclusive) and
+            // stopRow (exclusive) is the same, it should not be excluded which would
+            // happen w/o this control.
+            if (st != null && st.length > 0 &&
+                    Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
+                // if row is before startRow do not emit, pass to next row
+                if (st != null && st.length > 0 &&
+                        Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
+                    continue;
+                // if row is equal to stopRow or after it do not emit, stop iteration
+                if (sp != null && sp.length > 0 &&
+                        Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
+                    break;
+            }
+
+            List<KeyValue> kvs = null;
+            if (!scan.hasFamilies()) {
+                kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
+            } else {
+                kvs = new ArrayList<KeyValue>();
+                for (byte[] family : scan.getFamilyMap().keySet()){
+                    if (data.get(row).get(family) == null)
+                        continue;
+                    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+                    if (qualifiers == null || qualifiers.isEmpty())
+                        qualifiers = data.get(row).get(family).navigableKeySet();
+                    for (byte[] qualifier : qualifiers){
+                        if (data.get(row).get(family).get(qualifier) == null)
+                            continue;
+                        for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
+                            if (timestamp < scan.getTimeRange().getMin())
+                                continue;
+                            if (timestamp > scan.getTimeRange().getMax())
+                                continue;
+                            byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
+                            kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
+                            if(kvs.size() == scan.getMaxVersions()) {
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+            if (filter != null) {
+                filter.reset();
+                List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+                for (KeyValue kv : kvs) {
+                    if (filter.filterAllRemaining()) {
+                        break;
+                    }
+                    if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+                        continue;
+                    }
+                    Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
+                    if (filterResult == Filter.ReturnCode.INCLUDE) {
+                        nkvs.add(kv);
+                    } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
+                        break;
+                    }
+                    // ignoring next key hint which is a optimization to reduce file system IO
+                }
+                if (filter.hasFilterRow()) {
+                    filter.filterRow();
+                }
+                kvs = nkvs;
+            }
+            if (!kvs.isEmpty()) {
+                ret.add(new Result(kvs));
+            }
+        }
+
+        return new ResultScanner() {
+            private final Iterator<Result> iterator = ret.iterator();
+            public Iterator<Result> iterator() {
+                return iterator;
+            }
+            public Result[] next(int nbRows) throws IOException {
+                ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+                for(int i = 0; i < nbRows; i++) {
+                    Result next = next();
+                    if (next != null) {
+                        resultSets.add(next);
+                    } else {
+                        break;
+                    }
+                }
+                return resultSets.toArray(new Result[resultSets.size()]);
+            }
+            public Result next() throws IOException {
+                try {
+                    return iterator().next();
+                } catch (NoSuchElementException e) {
+                    return null;
+                }
+            }
+            public void close() {}
+        };
+    }
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        Scan scan = new Scan();
+        scan.addFamily(family);
+        return getScanner(scan);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier)
+            throws IOException {
+        Scan scan = new Scan();
+        scan.addColumn(family, qualifier);
+        return getScanner(scan);
+    }
+
+    List<Put> putLog = new ArrayList<>();
+
+    public List<Put> getPutLog() {
+        return putLog;
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        putLog.add(put);
+        byte[] row = put.getRow();
+        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
+        for (byte[] family : put.getFamilyMap().keySet()){
+            NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
+            for (KeyValue kv : put.getFamilyMap().get(family)){
+                kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
+                byte[] qualifier = kv.getQualifier();
+                NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
+                qualifierData.put(kv.getTimestamp(), kv.getValue());
+            }
+        }
+    }
+
+    /**
+     * Helper method to find a key in a map. If key is not found, newObject is
+     * added to map and returned
+     *
+     * @param map
+     *          map to extract value from
+     * @param key
+     *          key to look for
+     * @param newObject
+     *          set key to this if not found
+     * @return found value or newObject if not found
+     */
+    private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
+        V data = map.get(key);
+        if (data == null){
+            data = newObject;
+            map.put(key, data);
+        }
+        return data;
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        for (Put put : puts)
+            put(put);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Atomically checks if a row/family/qualifier value matches the expected
+     * value. If it does, it adds the put.  If the passed value is null, the check
+     * is for the lack of column (ie: non-existance)
+     *
+     * @param row       to check
+     * @param family    column family to check
+     * @param qualifier column qualifier to check
+     * @param compareOp comparison operator to use
+     * @param value     the expected value
+     * @param put       data to put if check succeeds
+     * @return true if the new put was executed, false otherwise
+     * @throws IOException e
+     */
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
+        return false;
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void delete(List<Delete> list) throws IOException {
+        throw new UnsupportedOperationException();
+
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Atomically checks if a row/family/qualifier value matches the expected
+     * value. If it does, it adds the delete.  If the passed value is null, the
+     * check is for the lack of column (ie: non-existance)
+     *
+     * @param row       to check
+     * @param family    column family to check
+     * @param qualifier column qualifier to check
+     * @param compareOp comparison operator to use
+     * @param value     the expected value
+     * @param delete    data to delete if check succeeds
+     * @return true if the new delete was executed, false otherwise
+     * @throws IOException e
+     */
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
+        return false;
+    }
+
+    @Override
+    public void mutateRow(RowMutations rowMutations) throws IOException {
+        throw new UnsupportedOperationException();
+
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param bytes
+     * @param bytes1
+     * @param bytes2
+     * @param l
+     * @param b
+     * @deprecated
+     */
+    @Override
+    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        return autoflush;
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
+        throw new UnsupportedOperationException();
+    }
+
+    boolean autoflush = true;
+
+    /**
+     * @param b
+     * @deprecated
+     */
+    @Override
+    public void setAutoFlush(boolean b) {
+        autoflush = b;
+    }
+
+    @Override
+    public void setAutoFlush(boolean b, boolean b1) {
+        autoflush = b;
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean b) {
+        autoflush = b;
+    }
+
+    long writeBufferSize = 0;
+    @Override
+    public long getWriteBufferSize() {
+        return writeBufferSize;
+    }
+
+    @Override
+    public void setWriteBufferSize(long l) throws IOException {
+        writeBufferSize = l;
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Atomically checks if a row/family/qualifier value matches the expected value.
+     * If it does, it performs the row mutations.  If the passed value is null, the check
+     * is for the lack of column (ie: non-existence)
+     *
+     * @param row       to check
+     * @param family    column family to check
+     * @param qualifier column qualifier to check
+     * @param compareOp the comparison operator
+     * @param value     the expected value
+     * @param mutation  mutations to perform if check succeeds
+     * @return true if the new put was executed, false otherwise
+     * @throws IOException e
+     */
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml
index c36c325..9a7d8df 100644
--- a/metron-streaming/Metron-Topologies/pom.xml
+++ b/metron-streaming/Metron-Topologies/pom.xml
@@ -42,6 +42,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>Metron-Testing</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>Metron-Common</artifactId>
             <version>${project.parent.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
index 0aba22f..3337855 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
@@ -21,9 +21,14 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
 import org.apache.metron.integration.util.UnitTestHelper;
 import org.apache.metron.integration.util.integration.ComponentRunner;
 import org.apache.metron.integration.util.integration.Processor;
@@ -33,18 +38,16 @@ import org.apache.metron.integration.util.integration.components.FluxTopologyCom
 import org.apache.metron.integration.util.mock.MockHTable;
 import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
 import org.apache.metron.parsing.parsers.PcapParser;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.test.converters.HexStringConverter;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
 import org.apache.metron.threatintel.ThreatIntelResults;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
+import java.io.*;
 import java.text.SimpleDateFormat;
 import java.util.*;
 
@@ -53,6 +56,14 @@ public class PcapIntegrationTest {
     private String topologiesDir = "src/main/resources/Metron_Configs/topologies";
     private String targetDir = "target";
 
+    public static class Provider implements TableProvider, Serializable{
+
+        MockHTable.Provider  provider = new MockHTable.Provider();
+        @Override
+        public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+            return provider.getTable(config, tableName);
+        }
+    }
 
     @Test
     public void testTopology() throws Exception {
@@ -107,7 +118,7 @@ public class PcapIntegrationTest {
             setProperty("bolt.hbase.write.buffer.size.in.bytes", "2000000");
             setProperty("bolt.hbase.durability", "SKIP_WAL");
             setProperty("bolt.hbase.partitioner.region.info.refresh.interval.mins","60");
-            setProperty("hbase.provider.impl","" + MockHTable.Provider.class.getName());
+            setProperty("hbase.provider.impl","" + Provider.class.getName());
             setProperty("threat.intel.tracker.table", trackerHBaseTable);
             setProperty("threat.intel.tracker.cf", cf);
             setProperty("threat.intel.ip.table", ipThreatIntelTable);
@@ -121,9 +132,9 @@ public class PcapIntegrationTest {
         //create MockHBaseTables
         final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
         final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
-        ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<ThreatIntelResults>(){{
-            add(new ThreatIntelResults(new ThreatIntelKey("10.0.2.3"), new HashMap<String, String>()));
-        }}, 0L);
+        ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
+            add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
+        }});
         final MockHTable pcapTable = (MockHTable) MockHTable.Provider.addToCache("pcap_test", "t");
         FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
                                                                        .withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml"))

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java
deleted file mode 100644
index 7af2212..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.Set;
-import java.util.Stack;
-
-public class UnitTestHelper {
-    public static String findDir(String name) {
-        return findDir(new File("."), name);
-    }
-
-    public static String findDir(File startDir, String name) {
-        Stack<File> s = new Stack<File>();
-        s.push(startDir);
-        while(!s.empty()) {
-            File parent = s.pop();
-            if(parent.getName().equalsIgnoreCase(name)) {
-                return parent.getAbsolutePath();
-            }
-            else {
-                File[] children = parent.listFiles();
-                if(children != null) {
-                    for (File child : children) {
-                        s.push(child);
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    public static <T> void assertSetEqual(String type, Set<T> expectedPcapIds, Set<T> found) {
-        boolean mismatch = false;
-        for(T f : found) {
-            if(!expectedPcapIds.contains(f)) {
-                mismatch = true;
-                System.out.println("Found " + type + " that I did not expect: " + f);
-            }
-        }
-        for(T expectedId : expectedPcapIds) {
-            if(!found.contains(expectedId)) {
-                mismatch = true;
-                System.out.println("Expected " + type + " that I did not index: " + expectedId);
-            }
-        }
-        Assert.assertFalse(mismatch);
-    }
-
-    public static void verboseLogging() {
-        verboseLogging("%d [%p|%c|%C{1}] %m%n", Level.ALL);
-    }
-    public static void verboseLogging(String pattern, Level level) {
-        ConsoleAppender console = new ConsoleAppender(); //create appender
-        //configure the appender
-        console.setLayout(new PatternLayout(pattern));
-        console.setThreshold(level);
-        console.activateOptions();
-        //add appender to any Logger (here is root)
-        Logger.getRootLogger().addAppender(console);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java
deleted file mode 100644
index 18c06f2..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-import backtype.storm.utils.Utils;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class ComponentRunner {
-    public static class Builder {
-        LinkedHashMap<String, InMemoryComponent> components;
-        String[] startupOrder;
-        String[] shutdownOrder;
-        public Builder() {
-            components = new LinkedHashMap<String, InMemoryComponent>();
-        }
-
-        public Builder withComponent(String name, InMemoryComponent component) {
-            components.put(name, component);
-            return this;
-        }
-
-        public Builder withCustomStartupOrder(String[] startupOrder) {
-            this.startupOrder = startupOrder;
-            return this;
-        }
-        public Builder withCustomShutdownOrder(String[] shutdownOrder) {
-            this.shutdownOrder = shutdownOrder;
-            return this;
-        }
-        private static String[] toOrderedList(Map<String, InMemoryComponent> components) {
-            String[] ret = new String[components.size()];
-            int i = 0;
-            for(String component : components.keySet()) {
-                ret[i++] = component;
-            }
-            return ret;
-        }
-        public ComponentRunner build() {
-            if(shutdownOrder == null) {
-                shutdownOrder = toOrderedList(components);
-            }
-            if(startupOrder == null) {
-                startupOrder = toOrderedList(components);
-            }
-            return new ComponentRunner(components, startupOrder, shutdownOrder);
-        }
-
-    }
-
-    LinkedHashMap<String, InMemoryComponent> components;
-    String[] startupOrder;
-    String[] shutdownOrder;
-    public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
-                          , String[] startupOrder
-                          , String[] shutdownOrder
-                          )
-    {
-        this.components = components;
-        this.startupOrder = startupOrder;
-        this.shutdownOrder = shutdownOrder;
-
-    }
-
-    public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
-        return clazz.cast(getComponents().get(name));
-    }
-
-    public LinkedHashMap<String, InMemoryComponent> getComponents() {
-        return components;
-    }
-
-    public void start() throws UnableToStartException {
-        for(String componentName : startupOrder) {
-            components.get(componentName).start();
-        }
-    }
-    public void stop() {
-        for(String componentName : shutdownOrder) {
-            components.get(componentName).stop();
-        }
-    }
-
-    public <T> T process(Processor<T> successState) {
-        return process(successState, 5, 30000, 120000);
-    }
-
-    public <T> T process(Processor<T> successState, int numRetries, long timeBetweenAttempts, long maxTimeMs) {
-        int retryCount = 0;
-        long start = System.currentTimeMillis();
-        while(true) {
-            long duration = System.currentTimeMillis() - start;
-            if(duration > maxTimeMs) {
-                throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs);
-            }
-            ReadinessState state = successState.process(this);
-            if(state == ReadinessState.READY) {
-                return successState.getResult();
-            }
-            else if(state == ReadinessState.NOT_READY) {
-                retryCount++;
-                if(numRetries > 0 && retryCount > numRetries) {
-                    throw new RuntimeException("Too many retries: " + retryCount);
-                }
-            }
-            Utils.sleep(timeBetweenAttempts);
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
deleted file mode 100644
index b5224a2..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public interface InMemoryComponent {
-    public void start() throws UnableToStartException;
-    public void stop();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java
deleted file mode 100644
index 59f3ece..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public interface Processor<T> {
-    ReadinessState process(ComponentRunner runner);
-    T getResult();
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java
deleted file mode 100644
index 59ce69a..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public enum ReadinessState {
-    READY, NOT_READY;
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java
deleted file mode 100644
index 4e691f4..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public class UnableToStartException extends Exception {
-    public UnableToStartException(String message) {
-        super(message);
-    }
-    public UnableToStartException(String message, Throwable t) {
-        super(message, t);
-    }
-}