You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:48 UTC

[06/43] incubator-metron git commit: METRON-50: Ingest threat intel data from Taxii feeds closes apache/incubator-metron#29

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
deleted file mode 100644
index a7991c0..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration.components;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.metron.integration.util.integration.InMemoryComponent;
-import org.apache.metron.integration.util.integration.UnableToStartException;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.ElasticsearchClient;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.elasticsearch.search.SearchHit;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class ElasticSearchComponent implements InMemoryComponent {
-
-    public static class Builder{
-        private int httpPort;
-        private File indexDir;
-        private Map<String, String> extraElasticSearchSettings = null;
-        public Builder withHttpPort(int httpPort) {
-            this.httpPort = httpPort;
-            return this;
-        }
-        public Builder withIndexDir(File indexDir) {
-            this.indexDir = indexDir;
-            return this;
-        }
-        public Builder withExtraElasticSearchSettings(Map<String, String> extraElasticSearchSettings) {
-            this.extraElasticSearchSettings = extraElasticSearchSettings;
-            return this;
-        }
-        public ElasticSearchComponent build() {
-            return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings);
-        }
-    }
-
-    private Client client;
-    private Node node;
-    private int httpPort;
-    private File indexDir;
-    private Map<String, String> extraElasticSearchSettings;
-
-    public ElasticSearchComponent(int httpPort, File indexDir) {
-        this(httpPort, indexDir, null);
-    }
-    public ElasticSearchComponent(int httpPort, File indexDir, Map<String, String> extraElasticSearchSettings) {
-        this.httpPort = httpPort;
-        this.indexDir = indexDir;
-        this.extraElasticSearchSettings = extraElasticSearchSettings;
-    }
-    public Client getClient() {
-        return client;
-    }
-
-    private void cleanDir(File dir) throws IOException {
-        if(dir.exists()) {
-            FileUtils.deleteDirectory(dir);
-        }
-        dir.mkdirs();
-    }
-    public void start() throws UnableToStartException {
-        File logDir= new File(indexDir, "/logs");
-        File dataDir= new File(indexDir, "/data");
-        try {
-            cleanDir(logDir);
-            cleanDir(dataDir);
-
-        } catch (IOException e) {
-            throw new UnableToStartException("Unable to clean log or data directories", e);
-        }
-        ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder()
-                .put("node.http.enabled", true)
-                .put("http.port", httpPort)
-                .put("cluster.name", "metron")
-                .put("path.logs",logDir.getAbsolutePath())
-                .put("path.data",dataDir.getAbsolutePath())
-                .put("gateway.type", "none")
-                .put("index.store.type", "memory")
-                .put("index.number_of_shards", 1)
-                .put("node.mode", "network")
-                .put("index.number_of_replicas", 1);
-        if(extraElasticSearchSettings != null) {
-            immutableSettings = immutableSettings.put(extraElasticSearchSettings);
-        }
-        Settings settings = immutableSettings.build();
-
-        node = NodeBuilder.nodeBuilder().settings(settings).node();
-        node.start();
-        settings = ImmutableSettings.settingsBuilder()
-					.put("cluster.name", "metron").build();
-		client = new TransportClient(settings)
-					.addTransportAddress(new InetSocketTransportAddress("localhost",
-							9300));
-
-        waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000));
-    }
-
-    public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException {
-        try {
-            ClusterHealthResponse healthResponse =
-                    (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet();
-            if (healthResponse != null && healthResponse.isTimedOut()) {
-                throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name()
-                        + " and not " + status.name()
-                        + ", from here on, everything will fail!");
-            }
-        } catch (ElasticsearchTimeoutException e) {
-            throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
-        }
-    }
-
-    public List<Map<String, Object>> getAllIndexedDocs(String index) throws IOException {
-       return getAllIndexedDocs(index, "message");
-    }
-    public List<Map<String, Object>> getAllIndexedDocs(String index, String subMessage) throws IOException {
-        getClient().admin().indices().refresh(new RefreshRequest());
-        SearchResponse response = getClient().prepareSearch(index)
-                .setTypes("pcap_doc")
-                .setSource("message")
-                .setFrom(0)
-                .setSize(1000)
-                .execute().actionGet();
-        List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
-        for (SearchHit hit : response.getHits()) {
-            Object o = null;
-            if(subMessage == null) {
-                o = hit.getSource();
-            }
-            else {
-                o = hit.getSource().get(subMessage);
-            }
-            ret.add((Map<String, Object>)(o));
-        }
-        return ret;
-    }
-    public boolean hasIndex(String indexName) {
-        Set<String> indices = getClient().admin()
-                                    .indices()
-                                    .stats(new IndicesStatsRequest())
-                                    .actionGet()
-                                    .getIndices()
-                                    .keySet();
-        return indices.contains(indexName);
-
-    }
-
-    public void stop() {
-        node.stop();
-        node = null;
-        client = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
deleted file mode 100644
index 2cac4ee..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration.components;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import org.apache.metron.integration.util.integration.InMemoryComponent;
-import org.apache.metron.integration.util.integration.UnableToStartException;
-import org.apache.storm.flux.FluxBuilder;
-import org.apache.storm.flux.model.ExecutionContext;
-import org.apache.storm.flux.model.TopologyDef;
-import org.apache.storm.flux.parser.FluxParser;
-import org.apache.thrift7.TException;
-import org.junit.Assert;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Properties;
-
-public class FluxTopologyComponent implements InMemoryComponent {
-    LocalCluster stormCluster;
-    String topologyName;
-    File topologyLocation;
-    Properties topologyProperties;
-
-    public static class Builder {
-        String topologyName;
-        File topologyLocation;
-        Properties topologyProperties;
-        public Builder withTopologyName(String name) {
-            this.topologyName = name;
-            return this;
-        }
-        public Builder withTopologyLocation(File location) {
-            this.topologyLocation = location;
-            return this;
-        }
-        public Builder withTopologyProperties(Properties properties) {
-            this.topologyProperties = properties;
-            return this;
-        }
-
-        public FluxTopologyComponent build() {
-            return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
-        }
-    }
-
-    public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
-        this.topologyName = topologyName;
-        this.topologyLocation = topologyLocation;
-        this.topologyProperties = topologyProperties;
-    }
-
-    public LocalCluster getStormCluster() {
-        return stormCluster;
-    }
-
-    public String getTopologyName() {
-        return topologyName;
-    }
-
-    public File getTopologyLocation() {
-        return topologyLocation;
-    }
-
-    public Properties getTopologyProperties() {
-        return topologyProperties;
-    }
-
-    public void start() throws UnableToStartException{
-        try {
-            stormCluster = new LocalCluster();
-        } catch (Exception e) {
-            throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e);
-        }
-    }
-
-    public void stop() {
-        stormCluster.shutdown();
-    }
-
-    public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
-        startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
-    }
-    private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
-        TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
-        Config conf = FluxBuilder.buildConfig(topologyDef);
-        ExecutionContext context = new ExecutionContext(topologyDef, conf);
-        StormTopology topology = FluxBuilder.buildTopology(context);
-        Assert.assertNotNull(topology);
-        topology.validate();
-        stormCluster.submitTopology(topologyName, conf, topology);
-    }
-
-    private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
-        File tmpFile = File.createTempFile(topologyName, "props");
-        tmpFile.deleteOnExit();
-        FileWriter propWriter = null;
-        try {
-            propWriter = new FileWriter(tmpFile);
-            properties.store(propWriter, topologyName + " properties");
-        }
-        finally {
-            if(propWriter != null) {
-                propWriter.close();
-                return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
-            }
-
-            return null;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java
deleted file mode 100644
index cbc6d7d..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java
+++ /dev/null
@@ -1,673 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.mock;
-
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.TableProvider;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * MockHTable.
- *
- * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217
- */
-public class MockHTable implements HTableInterface {
-
-    public static class Provider implements TableProvider {
-        private static Map<String, HTableInterface> _cache = new HashMap<>();
-        @Override
-        public HTableInterface getTable(Configuration config, String tableName) throws IOException {
-            return _cache.get(tableName);
-        }
-        public static HTableInterface getFromCache(String tableName) {
-            return _cache.get(tableName);
-        }
-        public static HTableInterface addToCache(String tableName, String... columnFamilies) {
-            MockHTable ret =  new MockHTable(tableName, columnFamilies);
-            _cache.put(tableName, ret);
-            return ret;
-        }
-
-        public static void clear() {
-            _cache.clear();
-        }
-    }
-
-    private final String tableName;
-    private final List<String> columnFamilies = new ArrayList<>();
-    private HColumnDescriptor[] descriptors;
-
-    private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
-            = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
-        return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
-    }
-
-    private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
-        List<KeyValue> ret = new ArrayList<KeyValue>();
-        for (byte[] family : rowdata.keySet())
-            for (byte[] qualifier : rowdata.get(family).keySet()) {
-                int versionsAdded = 0;
-                for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
-                    if (versionsAdded++ == maxVersions)
-                        break;
-                    Long timestamp = tsToVal.getKey();
-                    if (timestamp < timestampStart)
-                        continue;
-                    if (timestamp > timestampEnd)
-                        continue;
-                    byte[] value = tsToVal.getValue();
-                    ret.add(new KeyValue(row, family, qualifier, timestamp, value));
-                }
-            }
-        return ret;
-    }
-    public MockHTable(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public MockHTable(String tableName, String... columnFamilies) {
-        this.tableName = tableName;
-        for(String cf : columnFamilies) {
-            addColumnFamily(cf);
-        }
-    }
-
-    public void addColumnFamily(String columnFamily) {
-        this.columnFamilies.add(columnFamily);
-        descriptors = new HColumnDescriptor[columnFamilies.size()];
-        int i = 0;
-        for(String cf : columnFamilies) {
-            descriptors[i++] = new HColumnDescriptor(cf);
-        }
-    }
-
-
-    @Override
-    public byte[] getTableName() {
-        return Bytes.toBytes(tableName);
-    }
-
-    @Override
-    public TableName getName() {
-        return TableName.valueOf(tableName);
-    }
-
-    @Override
-    public Configuration getConfiguration() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public HTableDescriptor getTableDescriptor() throws IOException {
-        HTableDescriptor ret = new HTableDescriptor(tableName);
-        for(HColumnDescriptor c : descriptors) {
-            ret.addFamily(c);
-        }
-        return ret;
-    }
-
-    @Override
-    public boolean exists(Get get) throws IOException {
-        if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
-            return data.containsKey(get.getRow());
-        } else {
-            byte[] row = get.getRow();
-            if(!data.containsKey(row)) {
-                return false;
-            }
-            for(byte[] family : get.getFamilyMap().keySet()) {
-                if(!data.get(row).containsKey(family)) {
-                    return false;
-                } else {
-                    return true;
-                }
-            }
-            return true;
-        }
-    }
-
-    /**
-     * Test for the existence of columns in the table, as specified by the Gets.
-     * <p/>
-     * <p/>
-     * This will return an array of booleans. Each value will be true if the related Get matches
-     * one or more keys, false if not.
-     * <p/>
-     * <p/>
-     * This is a server-side call so it prevents any data from being transferred to
-     * the client.
-     *
-     * @param gets the Gets
-     * @return Array of boolean.  True if the specified Get matches one or more keys, false if not.
-     * @throws IOException e
-     */
-    @Override
-    public boolean[] existsAll(List<Get> gets) throws IOException {
-        boolean[] ret = new boolean[gets.size()];
-        int i = 0;
-        for(boolean b : exists(gets)) {
-           ret[i++] = b;
-        }
-        return ret;
-    }
-
-    @Override
-    public Boolean[] exists(List<Get> list) throws IOException {
-        Boolean[] ret = new Boolean[list.size()];
-        int i = 0;
-        for(Get g : list) {
-           ret[i++] = exists(g);
-        }
-        return ret;
-    }
-
-    @Override
-    public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    /**
-     * @param actions
-     * @deprecated
-     */
-    @Override
-    public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
-        List<Result> results = new ArrayList<Result>();
-        for (Row r : actions) {
-            if (r instanceof Delete) {
-                delete((Delete) r);
-                continue;
-            }
-            if (r instanceof Put) {
-                put((Put) r);
-                continue;
-            }
-            if (r instanceof Get) {
-                results.add(get((Get) r));
-            }
-        }
-        return results.toArray();
-    }
-
-    @Override
-    public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    /**
-     * @param list
-     * @param callback
-     * @deprecated
-     */
-    @Override
-    public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Result get(Get get) throws IOException {
-        if (!data.containsKey(get.getRow()))
-            return new Result();
-        byte[] row = get.getRow();
-        List<KeyValue> kvs = new ArrayList<KeyValue>();
-        if (!get.hasFamilies()) {
-            kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
-        } else {
-            for (byte[] family : get.getFamilyMap().keySet()){
-                if (data.get(row).get(family) == null)
-                    continue;
-                NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
-                if (qualifiers == null || qualifiers.isEmpty())
-                    qualifiers = data.get(row).get(family).navigableKeySet();
-                for (byte[] qualifier : qualifiers){
-                    if (qualifier == null)
-                        qualifier = "".getBytes();
-                    if (!data.get(row).containsKey(family) ||
-                            !data.get(row).get(family).containsKey(qualifier) ||
-                            data.get(row).get(family).get(qualifier).isEmpty())
-                        continue;
-                    Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
-                    kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
-                }
-            }
-        }
-        Filter filter = get.getFilter();
-        if (filter != null) {
-            filter.reset();
-            List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
-            for (KeyValue kv : kvs) {
-                if (filter.filterAllRemaining()) {
-                    break;
-                }
-                if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-                    continue;
-                }
-                if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
-                    nkvs.add(kv);
-                }
-                // ignoring next key hint which is a optimization to reduce file system IO
-            }
-            if (filter.hasFilterRow()) {
-                filter.filterRow();
-            }
-            kvs = nkvs;
-        }
-
-        return new Result(kvs);
-    }
-
-    @Override
-    public Result[] get(List<Get> list) throws IOException {
-        Result[] ret = new Result[list.size()];
-        int i = 0;
-        for(Get g : list) {
-            ret[i++] = get(g);
-        }
-        return ret;
-    }
-
-    /**
-     * @param bytes
-     * @param bytes1
-     * @deprecated
-     */
-    @Override
-    public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public ResultScanner getScanner(Scan scan) throws IOException {
-        final List<Result> ret = new ArrayList<Result>();
-        byte[] st = scan.getStartRow();
-        byte[] sp = scan.getStopRow();
-        Filter filter = scan.getFilter();
-
-        for (byte[] row : data.keySet()){
-            // if row is equal to startRow emit it. When startRow (inclusive) and
-            // stopRow (exclusive) is the same, it should not be excluded which would
-            // happen w/o this control.
-            if (st != null && st.length > 0 &&
-                    Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
-                // if row is before startRow do not emit, pass to next row
-                if (st != null && st.length > 0 &&
-                        Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
-                    continue;
-                // if row is equal to stopRow or after it do not emit, stop iteration
-                if (sp != null && sp.length > 0 &&
-                        Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
-                    break;
-            }
-
-            List<KeyValue> kvs = null;
-            if (!scan.hasFamilies()) {
-                kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
-            } else {
-                kvs = new ArrayList<KeyValue>();
-                for (byte[] family : scan.getFamilyMap().keySet()){
-                    if (data.get(row).get(family) == null)
-                        continue;
-                    NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
-                    if (qualifiers == null || qualifiers.isEmpty())
-                        qualifiers = data.get(row).get(family).navigableKeySet();
-                    for (byte[] qualifier : qualifiers){
-                        if (data.get(row).get(family).get(qualifier) == null)
-                            continue;
-                        for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
-                            if (timestamp < scan.getTimeRange().getMin())
-                                continue;
-                            if (timestamp > scan.getTimeRange().getMax())
-                                continue;
-                            byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
-                            kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
-                            if(kvs.size() == scan.getMaxVersions()) {
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-            if (filter != null) {
-                filter.reset();
-                List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
-                for (KeyValue kv : kvs) {
-                    if (filter.filterAllRemaining()) {
-                        break;
-                    }
-                    if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
-                        continue;
-                    }
-                    Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
-                    if (filterResult == Filter.ReturnCode.INCLUDE) {
-                        nkvs.add(kv);
-                    } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
-                        break;
-                    }
-                    // ignoring next key hint which is a optimization to reduce file system IO
-                }
-                if (filter.hasFilterRow()) {
-                    filter.filterRow();
-                }
-                kvs = nkvs;
-            }
-            if (!kvs.isEmpty()) {
-                ret.add(new Result(kvs));
-            }
-        }
-
-        return new ResultScanner() {
-            private final Iterator<Result> iterator = ret.iterator();
-            public Iterator<Result> iterator() {
-                return iterator;
-            }
-            public Result[] next(int nbRows) throws IOException {
-                ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
-                for(int i = 0; i < nbRows; i++) {
-                    Result next = next();
-                    if (next != null) {
-                        resultSets.add(next);
-                    } else {
-                        break;
-                    }
-                }
-                return resultSets.toArray(new Result[resultSets.size()]);
-            }
-            public Result next() throws IOException {
-                try {
-                    return iterator().next();
-                } catch (NoSuchElementException e) {
-                    return null;
-                }
-            }
-            public void close() {}
-        };
-    }
-    @Override
-    public ResultScanner getScanner(byte[] family) throws IOException {
-        Scan scan = new Scan();
-        scan.addFamily(family);
-        return getScanner(scan);
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family, byte[] qualifier)
-            throws IOException {
-        Scan scan = new Scan();
-        scan.addColumn(family, qualifier);
-        return getScanner(scan);
-    }
-
-    List<Put> putLog = new ArrayList<>();
-
-    public List<Put> getPutLog() {
-        return putLog;
-    }
-
-    @Override
-    public void put(Put put) throws IOException {
-        putLog.add(put);
-        byte[] row = put.getRow();
-        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
-        for (byte[] family : put.getFamilyMap().keySet()){
-            NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
-            for (KeyValue kv : put.getFamilyMap().get(family)){
-                kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
-                byte[] qualifier = kv.getQualifier();
-                NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
-                qualifierData.put(kv.getTimestamp(), kv.getValue());
-            }
-        }
-    }
-
-    /**
-     * Helper method to find a key in a map. If key is not found, newObject is
-     * added to map and returned
-     *
-     * @param map
-     *          map to extract value from
-     * @param key
-     *          key to look for
-     * @param newObject
-     *          set key to this if not found
-     * @return found value or newObject if not found
-     */
-    private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
-        V data = map.get(key);
-        if (data == null){
-            data = newObject;
-            map.put(key, data);
-        }
-        return data;
-    }
-
-    @Override
-    public void put(List<Put> puts) throws IOException {
-        for (Put put : puts)
-            put(put);
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the put.  If the passed value is null, the check
-     * is for the lack of column (ie: non-existance)
-     *
-     * @param row       to check
-     * @param family    column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp comparison operator to use
-     * @param value     the expected value
-     * @param put       data to put if check succeeds
-     * @return true if the new put was executed, false otherwise
-     * @throws IOException e
-     */
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
-        return false;
-    }
-
-    @Override
-    public void delete(Delete delete) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void delete(List<Delete> list) throws IOException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the delete.  If the passed value is null, the
-     * check is for the lack of column (ie: non-existance)
-     *
-     * @param row       to check
-     * @param family    column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp comparison operator to use
-     * @param value     the expected value
-     * @param delete    data to delete if check succeeds
-     * @return true if the new delete was executed, false otherwise
-     * @throws IOException e
-     */
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
-        return false;
-    }
-
-    @Override
-    public void mutateRow(RowMutations rowMutations) throws IOException {
-        throw new UnsupportedOperationException();
-
-    }
-
-    @Override
-    public Result append(Append append) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Result increment(Increment increment) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @param bytes
-     * @param bytes1
-     * @param bytes2
-     * @param l
-     * @param b
-     * @deprecated
-     */
-    @Override
-    public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isAutoFlush() {
-        return autoflush;
-    }
-
-    @Override
-    public void flushCommits() throws IOException {
-
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    boolean autoflush = true;
-
-    /**
-     * @param b
-     * @deprecated
-     */
-    @Override
-    public void setAutoFlush(boolean b) {
-        autoflush = b;
-    }
-
-    @Override
-    public void setAutoFlush(boolean b, boolean b1) {
-        autoflush = b;
-    }
-
-    @Override
-    public void setAutoFlushTo(boolean b) {
-        autoflush = b;
-    }
-
-    long writeBufferSize = 0;
-    @Override
-    public long getWriteBufferSize() {
-        return writeBufferSize;
-    }
-
-    @Override
-    public void setWriteBufferSize(long l) throws IOException {
-        writeBufferSize = l;
-    }
-
-    @Override
-    public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected value.
-     * If it does, it performs the row mutations.  If the passed value is null, the check
-     * is for the lack of column (ie: non-existence)
-     *
-     * @param row       to check
-     * @param family    column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp the comparison operator
-     * @param value     the expected value
-     * @param mutation  mutations to perform if check succeeds
-     * @return true if the new put was executed, false otherwise
-     * @throws IOException e
-     */
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
index 57d7902..1e64362 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
@@ -19,17 +19,21 @@ package org.apache.metron.integration.util.threatintel;
 
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
 import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
 
 import java.io.IOException;
 
 public enum ThreatIntelHelper {
     INSTANCE;
+    ThreatIntelConverter converter = new ThreatIntelConverter();
 
-    public void load(HTableInterface table, String cf, Iterable<ThreatIntelResults> results, long ts) throws IOException {
-        for(ThreatIntelResults result : results) {
-            Put put = Converter.INSTANCE.toPut(cf, result.getKey(), result.getValue(), ts);
+    public void load(HTableInterface table, String cf, Iterable<LookupKV<ThreatIntelKey, ThreatIntelValue>> results) throws IOException {
+        for(LookupKV<ThreatIntelKey, ThreatIntelValue> result : results) {
+            Put put = converter.toPut(cf, result.getKey(), result.getValue());
             table.put(put);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml
index 47ab2cb..827c8d0 100644
--- a/metron-streaming/pom.xml
+++ b/metron-streaming/pom.xml
@@ -69,6 +69,7 @@
 		<module>Metron-DataLoads</module>
 		<module>Metron-Topologies</module>
 		<module>Metron-Pcap_Service</module>
+		<module>Metron-Testing</module>
 	</modules>
 	<dependencies>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 992f328..a4c773d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,10 +45,12 @@
 						<exclude>**/.*</exclude>
 						<exclude>**/.*/**</exclude>
 						<exclude>**/*.seed</exclude>
+            <exclude>**/*.iml</exclude>
 						<exclude>**/ansible.cfg</exclude>
 						<exclude>site/**</exclude>
 						<exclude>metron-ui/lib/public/**</exclude>
 						<exclude>**/src/main/resources/patterns/**</exclude>
+						<exclude>**/src/test/resources/**</exclude>
 						<exclude>**/src/main/resources/SampleInput/**</exclude>
 						<exclude>**/dependency-reduced-pom.xml</exclude>
 					        <exclude>**/files/opensoc-ui</exclude>