You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:49 UTC
[07/43] incubator-metron git commit: METRON-50: Ingest threat intel
data from Taxii feeds closes apache/incubator-metron#29
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
index 8a82828..9db6398 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/threatintel/ThreatIntelAdapter.java
@@ -19,10 +19,8 @@ package org.apache.metron.threatintel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
-import org.apache.metron.reference.lookup.Lookup;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
import org.apache.metron.reference.lookup.accesstracker.BloomAccessTracker;
import org.apache.metron.reference.lookup.accesstracker.PersistentAccessTracker;
import org.apache.metron.threatintel.hbase.ThreatIntelLookup;
@@ -32,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
import java.util.UUID;
public class ThreatIntelAdapter implements EnrichmentAdapter<String>,Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
index 2c80a8d..e2a1037 100644
--- a/metron-streaming/Metron-Indexing/pom.xml
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -24,7 +24,6 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <elastic.search.version>1.3.1</elastic.search.version>
<http.client.version>4.3.4</http.client.version>
<jsonsimple.version>1.1.1</jsonsimple.version>
</properties>
@@ -74,7 +73,7 @@
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
- <version>${elastic.search.version}</version>
+ <version>${global_elasticsearch_version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
@@ -93,11 +92,6 @@
<version>1.9</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.2</version>
- </dependency>
</dependencies>
<reporting>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/pom.xml b/metron-streaming/Metron-Testing/pom.xml
new file mode 100644
index 0000000..d68d81d
--- /dev/null
+++ b/metron-streaming/Metron-Testing/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Streaming</artifactId>
+ <version>0.1BETA</version>
+ </parent>
+ <artifactId>Metron-Testing</artifactId>
+ <description>Metron Testing Utilities</description>
+ <properties>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${global_junit_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${global_flux_version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${global_hbase_guava_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ </build>
+ <reporting>
+ </reporting>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java
new file mode 100644
index 0000000..7af2212
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/UnitTestHelper.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.Set;
+import java.util.Stack;
+
+public class UnitTestHelper {
+ public static String findDir(String name) {
+ return findDir(new File("."), name);
+ }
+
+ public static String findDir(File startDir, String name) {
+ Stack<File> s = new Stack<File>();
+ s.push(startDir);
+ while(!s.empty()) {
+ File parent = s.pop();
+ if(parent.getName().equalsIgnoreCase(name)) {
+ return parent.getAbsolutePath();
+ }
+ else {
+ File[] children = parent.listFiles();
+ if(children != null) {
+ for (File child : children) {
+ s.push(child);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public static <T> void assertSetEqual(String type, Set<T> expectedPcapIds, Set<T> found) {
+ boolean mismatch = false;
+ for(T f : found) {
+ if(!expectedPcapIds.contains(f)) {
+ mismatch = true;
+ System.out.println("Found " + type + " that I did not expect: " + f);
+ }
+ }
+ for(T expectedId : expectedPcapIds) {
+ if(!found.contains(expectedId)) {
+ mismatch = true;
+ System.out.println("Expected " + type + " that I did not index: " + expectedId);
+ }
+ }
+ Assert.assertFalse(mismatch);
+ }
+
+ public static void verboseLogging() {
+ verboseLogging("%d [%p|%c|%C{1}] %m%n", Level.ALL);
+ }
+ public static void verboseLogging(String pattern, Level level) {
+ ConsoleAppender console = new ConsoleAppender(); //create appender
+ //configure the appender
+ console.setLayout(new PatternLayout(pattern));
+ console.setThreshold(level);
+ console.activateOptions();
+ //add appender to any Logger (here is root)
+ Logger.getRootLogger().addAppender(console);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
new file mode 100644
index 0000000..3e5e793
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ComponentRunner.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class ComponentRunner {
+ public static class Builder {
+ LinkedHashMap<String, InMemoryComponent> components;
+ String[] startupOrder;
+ String[] shutdownOrder;
+ public Builder() {
+ components = new LinkedHashMap<String, InMemoryComponent>();
+ }
+
+ public Builder withComponent(String name, InMemoryComponent component) {
+ components.put(name, component);
+ return this;
+ }
+
+ public Builder withCustomStartupOrder(String[] startupOrder) {
+ this.startupOrder = startupOrder;
+ return this;
+ }
+ public Builder withCustomShutdownOrder(String[] shutdownOrder) {
+ this.shutdownOrder = shutdownOrder;
+ return this;
+ }
+ private static String[] toOrderedList(Map<String, InMemoryComponent> components) {
+ String[] ret = new String[components.size()];
+ int i = 0;
+ for(String component : components.keySet()) {
+ ret[i++] = component;
+ }
+ return ret;
+ }
+ public ComponentRunner build() {
+ if(shutdownOrder == null) {
+ shutdownOrder = toOrderedList(components);
+ }
+ if(startupOrder == null) {
+ startupOrder = toOrderedList(components);
+ }
+ return new ComponentRunner(components, startupOrder, shutdownOrder);
+ }
+
+ }
+
+ LinkedHashMap<String, InMemoryComponent> components;
+ String[] startupOrder;
+ String[] shutdownOrder;
+ public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
+ , String[] startupOrder
+ , String[] shutdownOrder
+ )
+ {
+ this.components = components;
+ this.startupOrder = startupOrder;
+ this.shutdownOrder = shutdownOrder;
+
+ }
+
+ public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
+ return clazz.cast(getComponents().get(name));
+ }
+
+ public LinkedHashMap<String, InMemoryComponent> getComponents() {
+ return components;
+ }
+
+ public void start() throws UnableToStartException {
+ for(String componentName : startupOrder) {
+ components.get(componentName).start();
+ }
+ }
+ public void stop() {
+ for(String componentName : shutdownOrder) {
+ components.get(componentName).stop();
+ }
+ }
+
+ public <T> T process(Processor<T> successState) {
+ return process(successState, 5, 30000, 120000);
+ }
+
+ public <T> T process(Processor<T> successState, int numRetries, long timeBetweenAttempts, long maxTimeMs) {
+ int retryCount = 0;
+ long start = System.currentTimeMillis();
+ while(true) {
+ long duration = System.currentTimeMillis() - start;
+ if(duration > maxTimeMs) {
+ throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs);
+ }
+ ReadinessState state = successState.process(this);
+ if(state == ReadinessState.READY) {
+ return successState.getResult();
+ }
+ else if(state == ReadinessState.NOT_READY) {
+ retryCount++;
+ if(numRetries > 0 && retryCount > numRetries) {
+ throw new RuntimeException("Too many retries: " + retryCount);
+ }
+ }
+ try {
+ Thread.sleep(timeBetweenAttempts);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Unable to sleep", e);
+ }
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
new file mode 100644
index 0000000..b5224a2
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public interface InMemoryComponent {
+ public void start() throws UnableToStartException;
+ public void stop();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java
new file mode 100644
index 0000000..59f3ece
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/Processor.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public interface Processor<T> {
+ ReadinessState process(ComponentRunner runner);
+ T getResult();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java
new file mode 100644
index 0000000..59ce69a
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/ReadinessState.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public enum ReadinessState {
+ READY, NOT_READY;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java
new file mode 100644
index 0000000..4e691f4
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/UnableToStartException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration;
+
+public class UnableToStartException extends Exception {
+ public UnableToStartException(String message) {
+ super(message);
+ }
+ public UnableToStartException(String message, Throwable t) {
+ super(message, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
new file mode 100644
index 0000000..a7991c0
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.search.SearchHit;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ElasticSearchComponent implements InMemoryComponent {
+
+ public static class Builder{
+ private int httpPort;
+ private File indexDir;
+ private Map<String, String> extraElasticSearchSettings = null;
+ public Builder withHttpPort(int httpPort) {
+ this.httpPort = httpPort;
+ return this;
+ }
+ public Builder withIndexDir(File indexDir) {
+ this.indexDir = indexDir;
+ return this;
+ }
+ public Builder withExtraElasticSearchSettings(Map<String, String> extraElasticSearchSettings) {
+ this.extraElasticSearchSettings = extraElasticSearchSettings;
+ return this;
+ }
+ public ElasticSearchComponent build() {
+ return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings);
+ }
+ }
+
+ private Client client;
+ private Node node;
+ private int httpPort;
+ private File indexDir;
+ private Map<String, String> extraElasticSearchSettings;
+
+ public ElasticSearchComponent(int httpPort, File indexDir) {
+ this(httpPort, indexDir, null);
+ }
+ public ElasticSearchComponent(int httpPort, File indexDir, Map<String, String> extraElasticSearchSettings) {
+ this.httpPort = httpPort;
+ this.indexDir = indexDir;
+ this.extraElasticSearchSettings = extraElasticSearchSettings;
+ }
+ public Client getClient() {
+ return client;
+ }
+
+ private void cleanDir(File dir) throws IOException {
+ if(dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdirs();
+ }
+ public void start() throws UnableToStartException {
+ File logDir= new File(indexDir, "/logs");
+ File dataDir= new File(indexDir, "/data");
+ try {
+ cleanDir(logDir);
+ cleanDir(dataDir);
+
+ } catch (IOException e) {
+ throw new UnableToStartException("Unable to clean log or data directories", e);
+ }
+ ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder()
+ .put("node.http.enabled", true)
+ .put("http.port", httpPort)
+ .put("cluster.name", "metron")
+ .put("path.logs",logDir.getAbsolutePath())
+ .put("path.data",dataDir.getAbsolutePath())
+ .put("gateway.type", "none")
+ .put("index.store.type", "memory")
+ .put("index.number_of_shards", 1)
+ .put("node.mode", "network")
+ .put("index.number_of_replicas", 1);
+ if(extraElasticSearchSettings != null) {
+ immutableSettings = immutableSettings.put(extraElasticSearchSettings);
+ }
+ Settings settings = immutableSettings.build();
+
+ node = NodeBuilder.nodeBuilder().settings(settings).node();
+ node.start();
+ settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name", "metron").build();
+ client = new TransportClient(settings)
+ .addTransportAddress(new InetSocketTransportAddress("localhost",
+ 9300));
+
+ waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000));
+ }
+
+ public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException {
+ try {
+ ClusterHealthResponse healthResponse =
+ (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet();
+ if (healthResponse != null && healthResponse.isTimedOut()) {
+ throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name()
+ + " and not " + status.name()
+ + ", from here on, everything will fail!");
+ }
+ } catch (ElasticsearchTimeoutException e) {
+ throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
+ }
+ }
+
+ public List<Map<String, Object>> getAllIndexedDocs(String index) throws IOException {
+ return getAllIndexedDocs(index, "message");
+ }
+ public List<Map<String, Object>> getAllIndexedDocs(String index, String subMessage) throws IOException {
+ getClient().admin().indices().refresh(new RefreshRequest());
+ SearchResponse response = getClient().prepareSearch(index)
+ .setTypes("pcap_doc")
+ .setSource("message")
+ .setFrom(0)
+ .setSize(1000)
+ .execute().actionGet();
+ List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
+ for (SearchHit hit : response.getHits()) {
+ Object o = null;
+ if(subMessage == null) {
+ o = hit.getSource();
+ }
+ else {
+ o = hit.getSource().get(subMessage);
+ }
+ ret.add((Map<String, Object>)(o));
+ }
+ return ret;
+ }
+ public boolean hasIndex(String indexName) {
+ Set<String> indices = getClient().admin()
+ .indices()
+ .stats(new IndicesStatsRequest())
+ .actionGet()
+ .getIndices()
+ .keySet();
+ return indices.contains(indexName);
+
+ }
+
+ public void stop() {
+ node.stop();
+ node = null;
+ client = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
new file mode 100644
index 0000000..2cac4ee
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.integration.components;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import org.apache.metron.integration.util.integration.InMemoryComponent;
+import org.apache.metron.integration.util.integration.UnableToStartException;
+import org.apache.storm.flux.FluxBuilder;
+import org.apache.storm.flux.model.ExecutionContext;
+import org.apache.storm.flux.model.TopologyDef;
+import org.apache.storm.flux.parser.FluxParser;
+import org.apache.thrift7.TException;
+import org.junit.Assert;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+public class FluxTopologyComponent implements InMemoryComponent {
+ LocalCluster stormCluster;
+ String topologyName;
+ File topologyLocation;
+ Properties topologyProperties;
+
+ public static class Builder {
+ String topologyName;
+ File topologyLocation;
+ Properties topologyProperties;
+ public Builder withTopologyName(String name) {
+ this.topologyName = name;
+ return this;
+ }
+ public Builder withTopologyLocation(File location) {
+ this.topologyLocation = location;
+ return this;
+ }
+ public Builder withTopologyProperties(Properties properties) {
+ this.topologyProperties = properties;
+ return this;
+ }
+
+ public FluxTopologyComponent build() {
+ return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
+ }
+ }
+
+ public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
+ this.topologyName = topologyName;
+ this.topologyLocation = topologyLocation;
+ this.topologyProperties = topologyProperties;
+ }
+
+ public LocalCluster getStormCluster() {
+ return stormCluster;
+ }
+
+ public String getTopologyName() {
+ return topologyName;
+ }
+
+ public File getTopologyLocation() {
+ return topologyLocation;
+ }
+
+ public Properties getTopologyProperties() {
+ return topologyProperties;
+ }
+
+ public void start() throws UnableToStartException{
+ try {
+ stormCluster = new LocalCluster();
+ } catch (Exception e) {
+ throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e);
+ }
+ }
+
+ public void stop() {
+ stormCluster.shutdown();
+ }
+
+ public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
+ startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
+ }
+ private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
+ TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
+ Config conf = FluxBuilder.buildConfig(topologyDef);
+ ExecutionContext context = new ExecutionContext(topologyDef, conf);
+ StormTopology topology = FluxBuilder.buildTopology(context);
+ Assert.assertNotNull(topology);
+ topology.validate();
+ stormCluster.submitTopology(topologyName, conf, topology);
+ }
+
+ private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
+ File tmpFile = File.createTempFile(topologyName, "props");
+ tmpFile.deleteOnExit();
+ FileWriter propWriter = null;
+ try {
+ propWriter = new FileWriter(tmpFile);
+ properties.store(propWriter, topologyName + " properties");
+ }
+ finally {
+ if(propWriter != null) {
+ propWriter.close();
+ return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
+ }
+
+ return null;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java
new file mode 100644
index 0000000..76558d5
--- /dev/null
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/util/mock/MockHTable.java
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.util.mock;
+
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+
+/**
+ * MockHTable.
+ *
+ * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217
+ */
+public class MockHTable implements HTableInterface {
+
+ public static class Provider implements Serializable {
+ private static Map<String, HTableInterface> _cache = new HashMap<>();
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return _cache.get(tableName);
+ }
+ public static HTableInterface getFromCache(String tableName) {
+ return _cache.get(tableName);
+ }
+ public static HTableInterface addToCache(String tableName, String... columnFamilies) {
+ MockHTable ret = new MockHTable(tableName, columnFamilies);
+ _cache.put(tableName, ret);
+ return ret;
+ }
+
+ public static void clear() {
+ _cache.clear();
+ }
+ }
+
+ private final String tableName;
+ private final List<String> columnFamilies = new ArrayList<>();
+ private HColumnDescriptor[] descriptors;
+
+ private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
+ = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+ private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
+ return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
+ }
+
+ private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
+ List<KeyValue> ret = new ArrayList<KeyValue>();
+ for (byte[] family : rowdata.keySet())
+ for (byte[] qualifier : rowdata.get(family).keySet()) {
+ int versionsAdded = 0;
+ for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
+ if (versionsAdded++ == maxVersions)
+ break;
+ Long timestamp = tsToVal.getKey();
+ if (timestamp < timestampStart)
+ continue;
+ if (timestamp > timestampEnd)
+ continue;
+ byte[] value = tsToVal.getValue();
+ ret.add(new KeyValue(row, family, qualifier, timestamp, value));
+ }
+ }
+ return ret;
+ }
+ public MockHTable(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public MockHTable(String tableName, String... columnFamilies) {
+ this.tableName = tableName;
+ for(String cf : columnFamilies) {
+ addColumnFamily(cf);
+ }
+ }
+
+ public void addColumnFamily(String columnFamily) {
+ this.columnFamilies.add(columnFamily);
+ descriptors = new HColumnDescriptor[columnFamilies.size()];
+ int i = 0;
+ for(String cf : columnFamilies) {
+ descriptors[i++] = new HColumnDescriptor(cf);
+ }
+ }
+
+
+ @Override
+ public byte[] getTableName() {
+ return Bytes.toBytes(tableName);
+ }
+
+ @Override
+ public TableName getName() {
+ return TableName.valueOf(tableName);
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public HTableDescriptor getTableDescriptor() throws IOException {
+ HTableDescriptor ret = new HTableDescriptor(tableName);
+ for(HColumnDescriptor c : descriptors) {
+ ret.addFamily(c);
+ }
+ return ret;
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
+ return data.containsKey(get.getRow());
+ } else {
+ byte[] row = get.getRow();
+ if(!data.containsKey(row)) {
+ return false;
+ }
+ for(byte[] family : get.getFamilyMap().keySet()) {
+ if(!data.get(row).containsKey(family)) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Test for the existence of columns in the table, as specified by the Gets.
+ * <p/>
+ * <p/>
+ * This will return an array of booleans. Each value will be true if the related Get matches
+ * one or more keys, false if not.
+ * <p/>
+ * <p/>
+ * This is a server-side call so it prevents any data from being transferred to
+ * the client.
+ *
+ * @param gets the Gets
+ * @return Array of boolean. True if the specified Get matches one or more keys, false if not.
+ * @throws IOException e
+ */
+ @Override
+ public boolean[] existsAll(List<Get> gets) throws IOException {
+ boolean[] ret = new boolean[gets.size()];
+ int i = 0;
+ for(boolean b : exists(gets)) {
+ ret[i++] = b;
+ }
+ return ret;
+ }
+
+ @Override
+ public Boolean[] exists(List<Get> list) throws IOException {
+ Boolean[] ret = new Boolean[list.size()];
+ int i = 0;
+ for(Get g : list) {
+ ret[i++] = exists(g);
+ }
+ return ret;
+ }
+
+ @Override
+ public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ /**
+ * @param actions
+ * @deprecated
+ */
+ @Override
+ public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
+ List<Result> results = new ArrayList<Result>();
+ for (Row r : actions) {
+ if (r instanceof Delete) {
+ delete((Delete) r);
+ continue;
+ }
+ if (r instanceof Put) {
+ put((Put) r);
+ continue;
+ }
+ if (r instanceof Get) {
+ results.add(get((Get) r));
+ }
+ }
+ return results.toArray();
+ }
+
+ @Override
+ public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ /**
+ * @param list
+ * @param callback
+ * @deprecated
+ */
+ @Override
+ public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Result get(Get get) throws IOException {
+ if (!data.containsKey(get.getRow()))
+ return new Result();
+ byte[] row = get.getRow();
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ if (!get.hasFamilies()) {
+ kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
+ } else {
+ for (byte[] family : get.getFamilyMap().keySet()){
+ if (data.get(row).get(family) == null)
+ continue;
+ NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
+ if (qualifiers == null || qualifiers.isEmpty())
+ qualifiers = data.get(row).get(family).navigableKeySet();
+ for (byte[] qualifier : qualifiers){
+ if (qualifier == null)
+ qualifier = "".getBytes();
+ if (!data.get(row).containsKey(family) ||
+ !data.get(row).get(family).containsKey(qualifier) ||
+ data.get(row).get(family).get(qualifier).isEmpty())
+ continue;
+ Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
+ kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
+ }
+ }
+ }
+ Filter filter = get.getFilter();
+ if (filter != null) {
+ filter.reset();
+ List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+ for (KeyValue kv : kvs) {
+ if (filter.filterAllRemaining()) {
+ break;
+ }
+ if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+ continue;
+ }
+ if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
+ nkvs.add(kv);
+ }
+ // ignoring next key hint which is a optimization to reduce file system IO
+ }
+ if (filter.hasFilterRow()) {
+ filter.filterRow();
+ }
+ kvs = nkvs;
+ }
+
+ return new Result(kvs);
+ }
+
+ @Override
+ public Result[] get(List<Get> list) throws IOException {
+ Result[] ret = new Result[list.size()];
+ int i = 0;
+ for(Get g : list) {
+ ret[i++] = get(g);
+ }
+ return ret;
+ }
+
+ /**
+ * @param bytes
+ * @param bytes1
+ * @deprecated
+ */
+ @Override
+ public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) throws IOException {
+ final List<Result> ret = new ArrayList<Result>();
+ byte[] st = scan.getStartRow();
+ byte[] sp = scan.getStopRow();
+ Filter filter = scan.getFilter();
+
+ for (byte[] row : data.keySet()){
+ // if row is equal to startRow emit it. When startRow (inclusive) and
+ // stopRow (exclusive) is the same, it should not be excluded which would
+ // happen w/o this control.
+ if (st != null && st.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
+ // if row is before startRow do not emit, pass to next row
+ if (st != null && st.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
+ continue;
+ // if row is equal to stopRow or after it do not emit, stop iteration
+ if (sp != null && sp.length > 0 &&
+ Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
+ break;
+ }
+
+ List<KeyValue> kvs = null;
+ if (!scan.hasFamilies()) {
+ kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
+ } else {
+ kvs = new ArrayList<KeyValue>();
+ for (byte[] family : scan.getFamilyMap().keySet()){
+ if (data.get(row).get(family) == null)
+ continue;
+ NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
+ if (qualifiers == null || qualifiers.isEmpty())
+ qualifiers = data.get(row).get(family).navigableKeySet();
+ for (byte[] qualifier : qualifiers){
+ if (data.get(row).get(family).get(qualifier) == null)
+ continue;
+ for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
+ if (timestamp < scan.getTimeRange().getMin())
+ continue;
+ if (timestamp > scan.getTimeRange().getMax())
+ continue;
+ byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
+ kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
+ if(kvs.size() == scan.getMaxVersions()) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (filter != null) {
+ filter.reset();
+ List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
+ for (KeyValue kv : kvs) {
+ if (filter.filterAllRemaining()) {
+ break;
+ }
+ if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
+ continue;
+ }
+ Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
+ if (filterResult == Filter.ReturnCode.INCLUDE) {
+ nkvs.add(kv);
+ } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
+ break;
+ }
+ // ignoring next key hint which is a optimization to reduce file system IO
+ }
+ if (filter.hasFilterRow()) {
+ filter.filterRow();
+ }
+ kvs = nkvs;
+ }
+ if (!kvs.isEmpty()) {
+ ret.add(new Result(kvs));
+ }
+ }
+
+ return new ResultScanner() {
+ private final Iterator<Result> iterator = ret.iterator();
+ public Iterator<Result> iterator() {
+ return iterator;
+ }
+ public Result[] next(int nbRows) throws IOException {
+ ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
+ for(int i = 0; i < nbRows; i++) {
+ Result next = next();
+ if (next != null) {
+ resultSets.add(next);
+ } else {
+ break;
+ }
+ }
+ return resultSets.toArray(new Result[resultSets.size()]);
+ }
+ public Result next() throws IOException {
+ try {
+ return iterator().next();
+ } catch (NoSuchElementException e) {
+ return null;
+ }
+ }
+ public void close() {}
+ };
+ }
+ @Override
+ public ResultScanner getScanner(byte[] family) throws IOException {
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ return getScanner(scan);
+ }
+
+ @Override
+ public ResultScanner getScanner(byte[] family, byte[] qualifier)
+ throws IOException {
+ Scan scan = new Scan();
+ scan.addColumn(family, qualifier);
+ return getScanner(scan);
+ }
+
+ List<Put> putLog = new ArrayList<>();
+
+ public List<Put> getPutLog() {
+ return putLog;
+ }
+
+ @Override
+ public void put(Put put) throws IOException {
+ putLog.add(put);
+ byte[] row = put.getRow();
+ NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
+ for (byte[] family : put.getFamilyMap().keySet()){
+ NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
+ for (KeyValue kv : put.getFamilyMap().get(family)){
+ kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
+ byte[] qualifier = kv.getQualifier();
+ NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
+ qualifierData.put(kv.getTimestamp(), kv.getValue());
+ }
+ }
+ }
+
+ /**
+ * Helper method to find a key in a map. If key is not found, newObject is
+ * added to map and returned
+ *
+ * @param map
+ * map to extract value from
+ * @param key
+ * key to look for
+ * @param newObject
+ * set key to this if not found
+ * @return found value or newObject if not found
+ */
+ private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
+ V data = map.get(key);
+ if (data == null){
+ data = newObject;
+ map.put(key, data);
+ }
+ return data;
+ }
+
+ @Override
+ public void put(List<Put> puts) throws IOException {
+ for (Put put : puts)
+ put(put);
+ }
+
+ @Override
+ public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected
+ * value. If it does, it adds the put. If the passed value is null, the check
+ * is for the lack of column (ie: non-existance)
+ *
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param compareOp comparison operator to use
+ * @param value the expected value
+ * @param put data to put if check succeeds
+ * @return true if the new put was executed, false otherwise
+ * @throws IOException e
+ */
+ @Override
+ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void delete(Delete delete) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void delete(List<Delete> list) throws IOException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected
+ * value. If it does, it adds the delete. If the passed value is null, the
+ * check is for the lack of column (ie: non-existance)
+ *
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param compareOp comparison operator to use
+ * @param value the expected value
+ * @param delete data to delete if check succeeds
+ * @return true if the new delete was executed, false otherwise
+ * @throws IOException e
+ */
+ @Override
+ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void mutateRow(RowMutations rowMutations) throws IOException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public Result append(Append append) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Result increment(Increment increment) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @param bytes
+ * @param bytes1
+ * @param bytes2
+ * @param l
+ * @param b
+ * @deprecated
+ */
+ @Override
+ public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isAutoFlush() {
+ return autoflush;
+ }
+
+ @Override
+ public void flushCommits() throws IOException {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
+ throw new UnsupportedOperationException();
+ }
+
+ boolean autoflush = true;
+
+ /**
+ * @param b
+ * @deprecated
+ */
+ @Override
+ public void setAutoFlush(boolean b) {
+ autoflush = b;
+ }
+
+ @Override
+ public void setAutoFlush(boolean b, boolean b1) {
+ autoflush = b;
+ }
+
+ @Override
+ public void setAutoFlushTo(boolean b) {
+ autoflush = b;
+ }
+
+ long writeBufferSize = 0;
+ @Override
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ @Override
+ public void setWriteBufferSize(long l) throws IOException {
+ writeBufferSize = l;
+ }
+
+ @Override
+ public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected value.
+ * If it does, it performs the row mutations. If the passed value is null, the check
+ * is for the lack of column (ie: non-existence)
+ *
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param compareOp the comparison operator
+ * @param value the expected value
+ * @param mutation mutations to perform if check succeeds
+ * @return true if the new put was executed, false otherwise
+ * @throws IOException e
+ */
+ @Override
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/pom.xml b/metron-streaming/Metron-Topologies/pom.xml
index c36c325..9a7d8df 100644
--- a/metron-streaming/Metron-Topologies/pom.xml
+++ b/metron-streaming/Metron-Topologies/pom.xml
@@ -42,6 +42,12 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>Metron-Testing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>Metron-Common</artifactId>
<version>${project.parent.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
index 0aba22f..3337855 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/pcap/PcapIntegrationTest.java
@@ -21,9 +21,14 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
import org.apache.metron.integration.util.UnitTestHelper;
import org.apache.metron.integration.util.integration.ComponentRunner;
import org.apache.metron.integration.util.integration.Processor;
@@ -33,18 +38,16 @@ import org.apache.metron.integration.util.integration.components.FluxTopologyCom
import org.apache.metron.integration.util.mock.MockHTable;
import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
import org.apache.metron.parsing.parsers.PcapParser;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.test.converters.HexStringConverter;
-import org.apache.metron.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
import org.apache.metron.threatintel.ThreatIntelResults;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
+import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
@@ -53,6 +56,14 @@ public class PcapIntegrationTest {
private String topologiesDir = "src/main/resources/Metron_Configs/topologies";
private String targetDir = "target";
+ public static class Provider implements TableProvider, Serializable{
+
+ MockHTable.Provider provider = new MockHTable.Provider();
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return provider.getTable(config, tableName);
+ }
+ }
@Test
public void testTopology() throws Exception {
@@ -107,7 +118,7 @@ public class PcapIntegrationTest {
setProperty("bolt.hbase.write.buffer.size.in.bytes", "2000000");
setProperty("bolt.hbase.durability", "SKIP_WAL");
setProperty("bolt.hbase.partitioner.region.info.refresh.interval.mins","60");
- setProperty("hbase.provider.impl","" + MockHTable.Provider.class.getName());
+ setProperty("hbase.provider.impl","" + Provider.class.getName());
setProperty("threat.intel.tracker.table", trackerHBaseTable);
setProperty("threat.intel.tracker.cf", cf);
setProperty("threat.intel.ip.table", ipThreatIntelTable);
@@ -121,9 +132,9 @@ public class PcapIntegrationTest {
//create MockHBaseTables
final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
- ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<ThreatIntelResults>(){{
- add(new ThreatIntelResults(new ThreatIntelKey("10.0.2.3"), new HashMap<String, String>()));
- }}, 0L);
+ ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<LookupKV<ThreatIntelKey, ThreatIntelValue>>(){{
+ add(new LookupKV<>(new ThreatIntelKey("10.0.2.3"), new ThreatIntelValue(new HashMap<String, String>())));
+ }});
final MockHTable pcapTable = (MockHTable) MockHTable.Provider.addToCache("pcap_test", "t");
FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml"))
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java
deleted file mode 100644
index 7af2212..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/UnitTestHelper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util;
-
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.Set;
-import java.util.Stack;
-
-public class UnitTestHelper {
- public static String findDir(String name) {
- return findDir(new File("."), name);
- }
-
- public static String findDir(File startDir, String name) {
- Stack<File> s = new Stack<File>();
- s.push(startDir);
- while(!s.empty()) {
- File parent = s.pop();
- if(parent.getName().equalsIgnoreCase(name)) {
- return parent.getAbsolutePath();
- }
- else {
- File[] children = parent.listFiles();
- if(children != null) {
- for (File child : children) {
- s.push(child);
- }
- }
- }
- }
- return null;
- }
-
- public static <T> void assertSetEqual(String type, Set<T> expectedPcapIds, Set<T> found) {
- boolean mismatch = false;
- for(T f : found) {
- if(!expectedPcapIds.contains(f)) {
- mismatch = true;
- System.out.println("Found " + type + " that I did not expect: " + f);
- }
- }
- for(T expectedId : expectedPcapIds) {
- if(!found.contains(expectedId)) {
- mismatch = true;
- System.out.println("Expected " + type + " that I did not index: " + expectedId);
- }
- }
- Assert.assertFalse(mismatch);
- }
-
- public static void verboseLogging() {
- verboseLogging("%d [%p|%c|%C{1}] %m%n", Level.ALL);
- }
- public static void verboseLogging(String pattern, Level level) {
- ConsoleAppender console = new ConsoleAppender(); //create appender
- //configure the appender
- console.setLayout(new PatternLayout(pattern));
- console.setThreshold(level);
- console.activateOptions();
- //add appender to any Logger (here is root)
- Logger.getRootLogger().addAppender(console);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java
deleted file mode 100644
index 18c06f2..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ComponentRunner.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-import backtype.storm.utils.Utils;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class ComponentRunner {
- public static class Builder {
- LinkedHashMap<String, InMemoryComponent> components;
- String[] startupOrder;
- String[] shutdownOrder;
- public Builder() {
- components = new LinkedHashMap<String, InMemoryComponent>();
- }
-
- public Builder withComponent(String name, InMemoryComponent component) {
- components.put(name, component);
- return this;
- }
-
- public Builder withCustomStartupOrder(String[] startupOrder) {
- this.startupOrder = startupOrder;
- return this;
- }
- public Builder withCustomShutdownOrder(String[] shutdownOrder) {
- this.shutdownOrder = shutdownOrder;
- return this;
- }
- private static String[] toOrderedList(Map<String, InMemoryComponent> components) {
- String[] ret = new String[components.size()];
- int i = 0;
- for(String component : components.keySet()) {
- ret[i++] = component;
- }
- return ret;
- }
- public ComponentRunner build() {
- if(shutdownOrder == null) {
- shutdownOrder = toOrderedList(components);
- }
- if(startupOrder == null) {
- startupOrder = toOrderedList(components);
- }
- return new ComponentRunner(components, startupOrder, shutdownOrder);
- }
-
- }
-
- LinkedHashMap<String, InMemoryComponent> components;
- String[] startupOrder;
- String[] shutdownOrder;
- public ComponentRunner( LinkedHashMap<String, InMemoryComponent> components
- , String[] startupOrder
- , String[] shutdownOrder
- )
- {
- this.components = components;
- this.startupOrder = startupOrder;
- this.shutdownOrder = shutdownOrder;
-
- }
-
- public <T extends InMemoryComponent> T getComponent(String name, Class<T> clazz) {
- return clazz.cast(getComponents().get(name));
- }
-
- public LinkedHashMap<String, InMemoryComponent> getComponents() {
- return components;
- }
-
- public void start() throws UnableToStartException {
- for(String componentName : startupOrder) {
- components.get(componentName).start();
- }
- }
- public void stop() {
- for(String componentName : shutdownOrder) {
- components.get(componentName).stop();
- }
- }
-
- public <T> T process(Processor<T> successState) {
- return process(successState, 5, 30000, 120000);
- }
-
- public <T> T process(Processor<T> successState, int numRetries, long timeBetweenAttempts, long maxTimeMs) {
- int retryCount = 0;
- long start = System.currentTimeMillis();
- while(true) {
- long duration = System.currentTimeMillis() - start;
- if(duration > maxTimeMs) {
- throw new RuntimeException("Took too long to complete: " + duration + " > " + maxTimeMs);
- }
- ReadinessState state = successState.process(this);
- if(state == ReadinessState.READY) {
- return successState.getResult();
- }
- else if(state == ReadinessState.NOT_READY) {
- retryCount++;
- if(numRetries > 0 && retryCount > numRetries) {
- throw new RuntimeException("Too many retries: " + retryCount);
- }
- }
- Utils.sleep(timeBetweenAttempts);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
deleted file mode 100644
index b5224a2..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/InMemoryComponent.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public interface InMemoryComponent {
- public void start() throws UnableToStartException;
- public void stop();
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java
deleted file mode 100644
index 59f3ece..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/Processor.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public interface Processor<T> {
- ReadinessState process(ComponentRunner runner);
- T getResult();
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java
deleted file mode 100644
index 59ce69a..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/ReadinessState.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public enum ReadinessState {
- READY, NOT_READY;
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java
deleted file mode 100644
index 4e691f4..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/UnableToStartException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration;
-
-public class UnableToStartException extends Exception {
- public UnableToStartException(String message) {
- super(message);
- }
- public UnableToStartException(String message, Throwable t) {
- super(message, t);
- }
-}