You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:05:48 UTC
[06/43] incubator-metron git commit: METRON-50: Ingest threat intel
data from Taxii feeds closes apache/incubator-metron#29
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
deleted file mode 100644
index a7991c0..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/ElasticSearchComponent.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration.components;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.metron.integration.util.integration.InMemoryComponent;
-import org.apache.metron.integration.util.integration.UnableToStartException;
-import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.ElasticsearchClient;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
-import org.elasticsearch.search.SearchHit;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class ElasticSearchComponent implements InMemoryComponent {
-
- public static class Builder{
- private int httpPort;
- private File indexDir;
- private Map<String, String> extraElasticSearchSettings = null;
- public Builder withHttpPort(int httpPort) {
- this.httpPort = httpPort;
- return this;
- }
- public Builder withIndexDir(File indexDir) {
- this.indexDir = indexDir;
- return this;
- }
- public Builder withExtraElasticSearchSettings(Map<String, String> extraElasticSearchSettings) {
- this.extraElasticSearchSettings = extraElasticSearchSettings;
- return this;
- }
- public ElasticSearchComponent build() {
- return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings);
- }
- }
-
- private Client client;
- private Node node;
- private int httpPort;
- private File indexDir;
- private Map<String, String> extraElasticSearchSettings;
-
- public ElasticSearchComponent(int httpPort, File indexDir) {
- this(httpPort, indexDir, null);
- }
- public ElasticSearchComponent(int httpPort, File indexDir, Map<String, String> extraElasticSearchSettings) {
- this.httpPort = httpPort;
- this.indexDir = indexDir;
- this.extraElasticSearchSettings = extraElasticSearchSettings;
- }
- public Client getClient() {
- return client;
- }
-
- private void cleanDir(File dir) throws IOException {
- if(dir.exists()) {
- FileUtils.deleteDirectory(dir);
- }
- dir.mkdirs();
- }
- public void start() throws UnableToStartException {
- File logDir= new File(indexDir, "/logs");
- File dataDir= new File(indexDir, "/data");
- try {
- cleanDir(logDir);
- cleanDir(dataDir);
-
- } catch (IOException e) {
- throw new UnableToStartException("Unable to clean log or data directories", e);
- }
- ImmutableSettings.Builder immutableSettings = ImmutableSettings.settingsBuilder()
- .put("node.http.enabled", true)
- .put("http.port", httpPort)
- .put("cluster.name", "metron")
- .put("path.logs",logDir.getAbsolutePath())
- .put("path.data",dataDir.getAbsolutePath())
- .put("gateway.type", "none")
- .put("index.store.type", "memory")
- .put("index.number_of_shards", 1)
- .put("node.mode", "network")
- .put("index.number_of_replicas", 1);
- if(extraElasticSearchSettings != null) {
- immutableSettings = immutableSettings.put(extraElasticSearchSettings);
- }
- Settings settings = immutableSettings.build();
-
- node = NodeBuilder.nodeBuilder().settings(settings).node();
- node.start();
- settings = ImmutableSettings.settingsBuilder()
- .put("cluster.name", "metron").build();
- client = new TransportClient(settings)
- .addTransportAddress(new InetSocketTransportAddress("localhost",
- 9300));
-
- waitForCluster(client, ClusterHealthStatus.YELLOW, new TimeValue(60000));
- }
-
- public static void waitForCluster(ElasticsearchClient client, ClusterHealthStatus status, TimeValue timeout) throws UnableToStartException {
- try {
- ClusterHealthResponse healthResponse =
- (ClusterHealthResponse)client.execute(ClusterHealthAction.INSTANCE, new ClusterHealthRequest().waitForStatus(status).timeout(timeout)).actionGet();
- if (healthResponse != null && healthResponse.isTimedOut()) {
- throw new UnableToStartException("cluster state is " + healthResponse.getStatus().name()
- + " and not " + status.name()
- + ", from here on, everything will fail!");
- }
- } catch (ElasticsearchTimeoutException e) {
- throw new UnableToStartException("timeout, cluster does not respond to health request, cowardly refusing to continue with operations");
- }
- }
-
- public List<Map<String, Object>> getAllIndexedDocs(String index) throws IOException {
- return getAllIndexedDocs(index, "message");
- }
- public List<Map<String, Object>> getAllIndexedDocs(String index, String subMessage) throws IOException {
- getClient().admin().indices().refresh(new RefreshRequest());
- SearchResponse response = getClient().prepareSearch(index)
- .setTypes("pcap_doc")
- .setSource("message")
- .setFrom(0)
- .setSize(1000)
- .execute().actionGet();
- List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
- for (SearchHit hit : response.getHits()) {
- Object o = null;
- if(subMessage == null) {
- o = hit.getSource();
- }
- else {
- o = hit.getSource().get(subMessage);
- }
- ret.add((Map<String, Object>)(o));
- }
- return ret;
- }
- public boolean hasIndex(String indexName) {
- Set<String> indices = getClient().admin()
- .indices()
- .stats(new IndicesStatsRequest())
- .actionGet()
- .getIndices()
- .keySet();
- return indices.contains(indexName);
-
- }
-
- public void stop() {
- node.stop();
- node = null;
- client = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
deleted file mode 100644
index 2cac4ee..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/integration/components/FluxTopologyComponent.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.integration.components;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import org.apache.metron.integration.util.integration.InMemoryComponent;
-import org.apache.metron.integration.util.integration.UnableToStartException;
-import org.apache.storm.flux.FluxBuilder;
-import org.apache.storm.flux.model.ExecutionContext;
-import org.apache.storm.flux.model.TopologyDef;
-import org.apache.storm.flux.parser.FluxParser;
-import org.apache.thrift7.TException;
-import org.junit.Assert;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Properties;
-
-public class FluxTopologyComponent implements InMemoryComponent {
- LocalCluster stormCluster;
- String topologyName;
- File topologyLocation;
- Properties topologyProperties;
-
- public static class Builder {
- String topologyName;
- File topologyLocation;
- Properties topologyProperties;
- public Builder withTopologyName(String name) {
- this.topologyName = name;
- return this;
- }
- public Builder withTopologyLocation(File location) {
- this.topologyLocation = location;
- return this;
- }
- public Builder withTopologyProperties(Properties properties) {
- this.topologyProperties = properties;
- return this;
- }
-
- public FluxTopologyComponent build() {
- return new FluxTopologyComponent(topologyName, topologyLocation, topologyProperties);
- }
- }
-
- public FluxTopologyComponent(String topologyName, File topologyLocation, Properties topologyProperties) {
- this.topologyName = topologyName;
- this.topologyLocation = topologyLocation;
- this.topologyProperties = topologyProperties;
- }
-
- public LocalCluster getStormCluster() {
- return stormCluster;
- }
-
- public String getTopologyName() {
- return topologyName;
- }
-
- public File getTopologyLocation() {
- return topologyLocation;
- }
-
- public Properties getTopologyProperties() {
- return topologyProperties;
- }
-
- public void start() throws UnableToStartException{
- try {
- stormCluster = new LocalCluster();
- } catch (Exception e) {
- throw new UnableToStartException("Unable to start flux topology: " + getTopologyLocation(), e);
- }
- }
-
- public void stop() {
- stormCluster.shutdown();
- }
-
- public void submitTopology() throws NoSuchMethodException, IOException, InstantiationException, TException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
- startTopology(getTopologyName(), getTopologyLocation(), getTopologyProperties());
- }
- private void startTopology(String topologyName, File topologyLoc, Properties properties) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, TException {
- TopologyDef topologyDef = loadYaml(topologyName, topologyLoc, properties);
- Config conf = FluxBuilder.buildConfig(topologyDef);
- ExecutionContext context = new ExecutionContext(topologyDef, conf);
- StormTopology topology = FluxBuilder.buildTopology(context);
- Assert.assertNotNull(topology);
- topology.validate();
- stormCluster.submitTopology(topologyName, conf, topology);
- }
-
- private static TopologyDef loadYaml(String topologyName, File yamlFile, Properties properties) throws IOException {
- File tmpFile = File.createTempFile(topologyName, "props");
- tmpFile.deleteOnExit();
- FileWriter propWriter = null;
- try {
- propWriter = new FileWriter(tmpFile);
- properties.store(propWriter, topologyName + " properties");
- }
- finally {
- if(propWriter != null) {
- propWriter.close();
- return FluxParser.parseFile(yamlFile.getAbsolutePath(), false, true, tmpFile.getAbsolutePath(), false);
- }
-
- return null;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java
deleted file mode 100644
index cbc6d7d..0000000
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/mock/MockHTable.java
+++ /dev/null
@@ -1,673 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.integration.util.mock;
-
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.TableProvider;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * MockHTable.
- *
- * This implementation is a selected excerpt from https://gist.github.com/agaoglu/613217
- */
-public class MockHTable implements HTableInterface {
-
- public static class Provider implements TableProvider {
- private static Map<String, HTableInterface> _cache = new HashMap<>();
- @Override
- public HTableInterface getTable(Configuration config, String tableName) throws IOException {
- return _cache.get(tableName);
- }
- public static HTableInterface getFromCache(String tableName) {
- return _cache.get(tableName);
- }
- public static HTableInterface addToCache(String tableName, String... columnFamilies) {
- MockHTable ret = new MockHTable(tableName, columnFamilies);
- _cache.put(tableName, ret);
- return ret;
- }
-
- public static void clear() {
- _cache.clear();
- }
- }
-
- private final String tableName;
- private final List<String> columnFamilies = new ArrayList<>();
- private HColumnDescriptor[] descriptors;
-
- private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data
- = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
- private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) {
- return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions);
- }
-
- private static List<KeyValue> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) {
- List<KeyValue> ret = new ArrayList<KeyValue>();
- for (byte[] family : rowdata.keySet())
- for (byte[] qualifier : rowdata.get(family).keySet()) {
- int versionsAdded = 0;
- for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) {
- if (versionsAdded++ == maxVersions)
- break;
- Long timestamp = tsToVal.getKey();
- if (timestamp < timestampStart)
- continue;
- if (timestamp > timestampEnd)
- continue;
- byte[] value = tsToVal.getValue();
- ret.add(new KeyValue(row, family, qualifier, timestamp, value));
- }
- }
- return ret;
- }
- public MockHTable(String tableName) {
- this.tableName = tableName;
- }
-
- public MockHTable(String tableName, String... columnFamilies) {
- this.tableName = tableName;
- for(String cf : columnFamilies) {
- addColumnFamily(cf);
- }
- }
-
- public void addColumnFamily(String columnFamily) {
- this.columnFamilies.add(columnFamily);
- descriptors = new HColumnDescriptor[columnFamilies.size()];
- int i = 0;
- for(String cf : columnFamilies) {
- descriptors[i++] = new HColumnDescriptor(cf);
- }
- }
-
-
- @Override
- public byte[] getTableName() {
- return Bytes.toBytes(tableName);
- }
-
- @Override
- public TableName getName() {
- return TableName.valueOf(tableName);
- }
-
- @Override
- public Configuration getConfiguration() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- HTableDescriptor ret = new HTableDescriptor(tableName);
- for(HColumnDescriptor c : descriptors) {
- ret.addFamily(c);
- }
- return ret;
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- if(get.getFamilyMap() == null || get.getFamilyMap().size() == 0) {
- return data.containsKey(get.getRow());
- } else {
- byte[] row = get.getRow();
- if(!data.containsKey(row)) {
- return false;
- }
- for(byte[] family : get.getFamilyMap().keySet()) {
- if(!data.get(row).containsKey(family)) {
- return false;
- } else {
- return true;
- }
- }
- return true;
- }
- }
-
- /**
- * Test for the existence of columns in the table, as specified by the Gets.
- * <p/>
- * <p/>
- * This will return an array of booleans. Each value will be true if the related Get matches
- * one or more keys, false if not.
- * <p/>
- * <p/>
- * This is a server-side call so it prevents any data from being transferred to
- * the client.
- *
- * @param gets the Gets
- * @return Array of boolean. True if the specified Get matches one or more keys, false if not.
- * @throws IOException e
- */
- @Override
- public boolean[] existsAll(List<Get> gets) throws IOException {
- boolean[] ret = new boolean[gets.size()];
- int i = 0;
- for(boolean b : exists(gets)) {
- ret[i++] = b;
- }
- return ret;
- }
-
- @Override
- public Boolean[] exists(List<Get> list) throws IOException {
- Boolean[] ret = new Boolean[list.size()];
- int i = 0;
- for(Get g : list) {
- ret[i++] = exists(g);
- }
- return ret;
- }
-
- @Override
- public void batch(List<? extends Row> list, Object[] objects) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
-
- }
-
- /**
- * @param actions
- * @deprecated
- */
- @Override
- public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException {
- List<Result> results = new ArrayList<Result>();
- for (Row r : actions) {
- if (r instanceof Delete) {
- delete((Delete) r);
- continue;
- }
- if (r instanceof Put) {
- put((Put) r);
- continue;
- }
- if (r instanceof Get) {
- results.add(get((Get) r));
- }
- }
- return results.toArray();
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> list, Object[] objects, Batch.Callback<R> callback) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
-
- }
-
- /**
- * @param list
- * @param callback
- * @deprecated
- */
- @Override
- public <R> Object[] batchCallback(List<? extends Row> list, Batch.Callback<R> callback) throws IOException, InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Result get(Get get) throws IOException {
- if (!data.containsKey(get.getRow()))
- return new Result();
- byte[] row = get.getRow();
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- if (!get.hasFamilies()) {
- kvs = toKeyValue(row, data.get(row), get.getMaxVersions());
- } else {
- for (byte[] family : get.getFamilyMap().keySet()){
- if (data.get(row).get(family) == null)
- continue;
- NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family);
- if (qualifiers == null || qualifiers.isEmpty())
- qualifiers = data.get(row).get(family).navigableKeySet();
- for (byte[] qualifier : qualifiers){
- if (qualifier == null)
- qualifier = "".getBytes();
- if (!data.get(row).containsKey(family) ||
- !data.get(row).get(family).containsKey(qualifier) ||
- data.get(row).get(family).get(qualifier).isEmpty())
- continue;
- Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry();
- kvs.add(new KeyValue(row,family, qualifier, timestampAndValue.getKey(), timestampAndValue.getValue()));
- }
- }
- }
- Filter filter = get.getFilter();
- if (filter != null) {
- filter.reset();
- List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
- for (KeyValue kv : kvs) {
- if (filter.filterAllRemaining()) {
- break;
- }
- if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
- continue;
- }
- if (filter.filterKeyValue(kv) == Filter.ReturnCode.INCLUDE) {
- nkvs.add(kv);
- }
- // ignoring next key hint which is a optimization to reduce file system IO
- }
- if (filter.hasFilterRow()) {
- filter.filterRow();
- }
- kvs = nkvs;
- }
-
- return new Result(kvs);
- }
-
- @Override
- public Result[] get(List<Get> list) throws IOException {
- Result[] ret = new Result[list.size()];
- int i = 0;
- for(Get g : list) {
- ret[i++] = get(g);
- }
- return ret;
- }
-
- /**
- * @param bytes
- * @param bytes1
- * @deprecated
- */
- @Override
- public Result getRowOrBefore(byte[] bytes, byte[] bytes1) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- final List<Result> ret = new ArrayList<Result>();
- byte[] st = scan.getStartRow();
- byte[] sp = scan.getStopRow();
- Filter filter = scan.getFilter();
-
- for (byte[] row : data.keySet()){
- // if row is equal to startRow emit it. When startRow (inclusive) and
- // stopRow (exclusive) is the same, it should not be excluded which would
- // happen w/o this control.
- if (st != null && st.length > 0 &&
- Bytes.BYTES_COMPARATOR.compare(st, row) != 0) {
- // if row is before startRow do not emit, pass to next row
- if (st != null && st.length > 0 &&
- Bytes.BYTES_COMPARATOR.compare(st, row) > 0)
- continue;
- // if row is equal to stopRow or after it do not emit, stop iteration
- if (sp != null && sp.length > 0 &&
- Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0)
- break;
- }
-
- List<KeyValue> kvs = null;
- if (!scan.hasFamilies()) {
- kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), scan.getMaxVersions());
- } else {
- kvs = new ArrayList<KeyValue>();
- for (byte[] family : scan.getFamilyMap().keySet()){
- if (data.get(row).get(family) == null)
- continue;
- NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family);
- if (qualifiers == null || qualifiers.isEmpty())
- qualifiers = data.get(row).get(family).navigableKeySet();
- for (byte[] qualifier : qualifiers){
- if (data.get(row).get(family).get(qualifier) == null)
- continue;
- for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()){
- if (timestamp < scan.getTimeRange().getMin())
- continue;
- if (timestamp > scan.getTimeRange().getMax())
- continue;
- byte[] value = data.get(row).get(family).get(qualifier).get(timestamp);
- kvs.add(new KeyValue(row, family, qualifier, timestamp, value));
- if(kvs.size() == scan.getMaxVersions()) {
- break;
- }
- }
- }
- }
- }
- if (filter != null) {
- filter.reset();
- List<KeyValue> nkvs = new ArrayList<KeyValue>(kvs.size());
- for (KeyValue kv : kvs) {
- if (filter.filterAllRemaining()) {
- break;
- }
- if (filter.filterRowKey(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength())) {
- continue;
- }
- Filter.ReturnCode filterResult = filter.filterKeyValue(kv);
- if (filterResult == Filter.ReturnCode.INCLUDE) {
- nkvs.add(kv);
- } else if (filterResult == Filter.ReturnCode.NEXT_ROW) {
- break;
- }
- // ignoring next key hint which is a optimization to reduce file system IO
- }
- if (filter.hasFilterRow()) {
- filter.filterRow();
- }
- kvs = nkvs;
- }
- if (!kvs.isEmpty()) {
- ret.add(new Result(kvs));
- }
- }
-
- return new ResultScanner() {
- private final Iterator<Result> iterator = ret.iterator();
- public Iterator<Result> iterator() {
- return iterator;
- }
- public Result[] next(int nbRows) throws IOException {
- ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
- for(int i = 0; i < nbRows; i++) {
- Result next = next();
- if (next != null) {
- resultSets.add(next);
- } else {
- break;
- }
- }
- return resultSets.toArray(new Result[resultSets.size()]);
- }
- public Result next() throws IOException {
- try {
- return iterator().next();
- } catch (NoSuchElementException e) {
- return null;
- }
- }
- public void close() {}
- };
- }
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- Scan scan = new Scan();
- scan.addFamily(family);
- return getScanner(scan);
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier)
- throws IOException {
- Scan scan = new Scan();
- scan.addColumn(family, qualifier);
- return getScanner(scan);
- }
-
- List<Put> putLog = new ArrayList<>();
-
- public List<Put> getPutLog() {
- return putLog;
- }
-
- @Override
- public void put(Put put) throws IOException {
- putLog.add(put);
- byte[] row = put.getRow();
- NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR));
- for (byte[] family : put.getFamilyMap().keySet()){
- NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR));
- for (KeyValue kv : put.getFamilyMap().get(family)){
- kv.updateLatestStamp(Bytes.toBytes(System.currentTimeMillis()));
- byte[] qualifier = kv.getQualifier();
- NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>());
- qualifierData.put(kv.getTimestamp(), kv.getValue());
- }
- }
- }
-
- /**
- * Helper method to find a key in a map. If key is not found, newObject is
- * added to map and returned
- *
- * @param map
- * map to extract value from
- * @param key
- * key to look for
- * @param newObject
- * set key to this if not found
- * @return found value or newObject if not found
- */
- private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject){
- V data = map.get(key);
- if (data == null){
- data = newObject;
- map.put(key, data);
- }
- return data;
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
- for (Put put : puts)
- put(put);
- }
-
- @Override
- public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Put put) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param put data to put if check succeeds
- * @return true if the new put was executed, false otherwise
- * @throws IOException e
- */
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
- return false;
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void delete(List<Delete> list) throws IOException {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, byte[] bytes3, Delete delete) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @return true if the new delete was executed, false otherwise
- * @throws IOException e
- */
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
- return false;
- }
-
- @Override
- public void mutateRow(RowMutations rowMutations) throws IOException {
- throw new UnsupportedOperationException();
-
- }
-
- @Override
- public Result append(Append append) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, Durability durability) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * @param bytes
- * @param bytes1
- * @param bytes2
- * @param l
- * @param b
- * @deprecated
- */
- @Override
- public long incrementColumnValue(byte[] bytes, byte[] bytes1, byte[] bytes2, long l, boolean b) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isAutoFlush() {
- return autoflush;
- }
-
- @Override
- public void flushCommits() throws IOException {
-
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] bytes) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T extends Service, R> void coprocessorService(Class<T> aClass, byte[] bytes, byte[] bytes1, Batch.Call<T, R> call, Batch.Callback<R> callback) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- boolean autoflush = true;
-
- /**
- * @param b
- * @deprecated
- */
- @Override
- public void setAutoFlush(boolean b) {
- autoflush = b;
- }
-
- @Override
- public void setAutoFlush(boolean b, boolean b1) {
- autoflush = b;
- }
-
- @Override
- public void setAutoFlushTo(boolean b) {
- autoflush = b;
- }
-
- long writeBufferSize = 0;
- @Override
- public long getWriteBufferSize() {
- return writeBufferSize;
- }
-
- @Override
- public void setWriteBufferSize(long l) throws IOException {
- writeBufferSize = l;
- }
-
- @Override
- public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message message, byte[] bytes, byte[] bytes1, R r, Batch.Callback<R> callback) throws ServiceException, Throwable {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected value.
- * If it does, it performs the row mutations. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp the comparison operator
- * @param value the expected value
- * @param mutation mutations to perform if check succeeds
- * @return true if the new put was executed, false otherwise
- * @throws IOException e
- */
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
index 57d7902..1e64362 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/util/threatintel/ThreatIntelHelper.java
@@ -19,17 +19,21 @@ package org.apache.metron.integration.util.threatintel;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelKey;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelValue;
+import org.apache.metron.reference.lookup.LookupKV;
import org.apache.metron.threatintel.ThreatIntelResults;
-import org.apache.metron.threatintel.hbase.Converter;
+import org.apache.metron.hbase.converters.threatintel.ThreatIntelConverter;
import java.io.IOException;
public enum ThreatIntelHelper {
INSTANCE;
+ ThreatIntelConverter converter = new ThreatIntelConverter();
- public void load(HTableInterface table, String cf, Iterable<ThreatIntelResults> results, long ts) throws IOException {
- for(ThreatIntelResults result : results) {
- Put put = Converter.INSTANCE.toPut(cf, result.getKey(), result.getValue(), ts);
+ public void load(HTableInterface table, String cf, Iterable<LookupKV<ThreatIntelKey, ThreatIntelValue>> results) throws IOException {
+ for(LookupKV<ThreatIntelKey, ThreatIntelValue> result : results) {
+ Put put = converter.toPut(cf, result.getKey(), result.getValue());
table.put(put);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/metron-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml
index 47ab2cb..827c8d0 100644
--- a/metron-streaming/pom.xml
+++ b/metron-streaming/pom.xml
@@ -69,6 +69,7 @@
<module>Metron-DataLoads</module>
<module>Metron-Topologies</module>
<module>Metron-Pcap_Service</module>
+ <module>Metron-Testing</module>
</modules>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/0e1055aa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 992f328..a4c773d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,10 +45,12 @@
<exclude>**/.*</exclude>
<exclude>**/.*/**</exclude>
<exclude>**/*.seed</exclude>
+ <exclude>**/*.iml</exclude>
<exclude>**/ansible.cfg</exclude>
<exclude>site/**</exclude>
<exclude>metron-ui/lib/public/**</exclude>
<exclude>**/src/main/resources/patterns/**</exclude>
+ <exclude>**/src/test/resources/**</exclude>
<exclude>**/src/main/resources/SampleInput/**</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/files/opensoc-ui</exclude>