You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/13 21:34:25 UTC
incubator-metron git commit: METRON-203 General best practice and bug
fixes (mmiklavcic via cestella) closes apache/incubator-metron#146
Repository: incubator-metron
Updated Branches:
refs/heads/master ee0eb5bbb -> 925469b85
METRON-203 General best practice and bug fixes (mmiklavcic via cestella) closes apache/incubator-metron#146
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/925469b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/925469b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/925469b8
Branch: refs/heads/master
Commit: 925469b859010599484ef9dd828ccc6182f50871
Parents: ee0eb5b
Author: mmiklavc <mi...@gmail.com>
Authored: Mon Jun 13 17:34:15 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Jun 13 17:34:15 2016 -0400
----------------------------------------------------------------------
.../PcapReceiverImplRestEasyTest.java | 10 +-
metron-platform/metron-common/pom.xml | 6 -
.../apache/metron/common/utils/ErrorUtils.java | 95 ++-
.../metron/common/utils/ErrorUtilsTest.java | 65 +++
.../bulk/ElasticsearchDataPrunerRunner.java | 4 +-
.../dataloads/nonbulk/taxii/TaxiiHandler.java | 574 +++++++++----------
.../accesstracker/BloomAccessTracker.java | 5 +-
metron-platform/metron-test-utilities/pom.xml | 12 +-
8 files changed, 440 insertions(+), 331 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index 1793b06..bfe2233 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -100,7 +100,7 @@ public class PcapReceiverImplRestEasyTest {
Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
{
boolean includeReverseTraffic = true;
@@ -149,7 +149,7 @@ public class PcapReceiverImplRestEasyTest {
Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
@Test
@@ -171,7 +171,7 @@ public class PcapReceiverImplRestEasyTest {
Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
@Test
@@ -194,7 +194,7 @@ public class PcapReceiverImplRestEasyTest {
Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
Assert.assertEquals(0, fixedQueryHandler.beginNS);
Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
{
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
@@ -227,7 +227,7 @@ public class PcapReceiverImplRestEasyTest {
Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
Assert.assertEquals(0, fixedQueryHandler.beginNS);
Assert.assertTrue(fixedQueryHandler.endNS > 0);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
{
String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 9fa8daf..0a36915 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -198,12 +198,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>${global_mockito_version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
<version>2.0.2</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index a914e15..9fffe2a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,39 +17,98 @@
*/
package org.apache.metron.common.utils;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.metron.common.Constants;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Optional;
+import java.util.function.Function;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Values;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.metron.common.Constants;
-import org.json.simple.JSONObject;
+import static java.lang.String.format;
public class ErrorUtils {
+ private final static Logger LOGGER = LoggerFactory.getLogger(ErrorUtils.class);
+
+ public enum RuntimeErrors {
+ ILLEGAL_ARG(t -> new IllegalArgumentException(formatReason(t), t.getRight().orElse(null))),
+ ILLEGAL_STATE(t -> new IllegalStateException(formatReason(t), t.getRight().orElse(null)));
+
+ Function<Pair<String, Optional<Throwable>>, RuntimeException> func;
+
+ RuntimeErrors(Function<Pair<String, Optional<Throwable>>, RuntimeException> func) {
+ this.func = func;
+ }
+
+ /**
+ * Throw runtime exception with "reason".
+ *
+ * @param reason Message to include in exception
+ */
+ public void throwRuntime(String reason) {
+ throwRuntime(reason, Optional.empty());
+ }
+
+ /**
+ * Throw runtime exception with format "reason + cause message + cause Throwable"
+ *
+ * @param reason Message to include in exception
+ * @param t Wrapped exception
+ */
+ public void throwRuntime(String reason, Throwable t) {
+ throwRuntime(reason, Optional.of(t));
+ }
+
+ /**
+ * Throw runtime exception with format "reason + cause message + cause Throwable".
+ * If the optional Throwable is empty/null, the exception will only include "reason".
+ *
+ * @param reason Message to include in exception
+ * @param t Optional wrapped exception
+ */
+ public void throwRuntime(String reason, Optional<Throwable> t) {
+ throw func.apply(Pair.of(reason, t));
+ }
+
+ private static String formatReason(Pair<String, Optional<Throwable>> p) {
+ return formatReason(p.getLeft(), p.getRight());
+ }
+
+ private static String formatReason(String reason, Optional<Throwable> t) {
+ if (t.isPresent()) {
+ return format("%s - reason:%s", reason, t.get());
+ } else {
+ return format("%s", reason);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked") // JSONObject extends HashMap w/o type parameters
+ public static JSONObject generateErrorMessage(String message, Throwable t) {
+ JSONObject error_message = new JSONObject();
- @SuppressWarnings("unchecked")
- public static JSONObject generateErrorMessage(String message, Throwable t)
- {
- JSONObject error_message = new JSONObject();
-
/*
- * Save full stack trace in object.
+ * Save full stack trace in object.
*/
String stackTrace = ExceptionUtils.getStackTrace(t);
String exception = t.toString();
- error_message.put("time", System.currentTimeMillis());
- try {
- error_message.put("hostname", InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException ex) {
- // TODO Auto-generated catch block
- ex.printStackTrace();
- }
+ error_message.put("time", System.currentTimeMillis());
+ try {
+ error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException ex) {
+ LOGGER.info("Unable to resolve hostname while generating error message", ex);
+ }
error_message.put("message", message);
error_message.put(Constants.SENSOR_TYPE, "error");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
new file mode 100644
index 0000000..f11a5c9
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/ErrorUtilsTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.utils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.nullValue;
+
+public class ErrorUtilsTest {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void illegal_arg_throws_exception_with_reason() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("illegal arg happened");
+ exception.expectCause(nullValue(Throwable.class));
+ ErrorUtils.RuntimeErrors.ILLEGAL_ARG.throwRuntime("illegal arg happened");
+ }
+
+ @Test
+ public void illegal_arg_throws_exception_with_reason_and_cause() throws Exception {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("illegal arg happened");
+ exception.expectCause(instanceOf(IOException.class));
+ ErrorUtils.RuntimeErrors.ILLEGAL_ARG.throwRuntime("illegal arg happened", new IOException("bad io"));
+ }
+
+ @Test
+ public void illegal_state_throws_exception_with_reason() throws Exception {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("illegal state happened");
+ exception.expectCause(nullValue(Throwable.class));
+ ErrorUtils.RuntimeErrors.ILLEGAL_STATE.throwRuntime("illegal state happened");
+ }
+
+ @Test
+ public void illegal_state_throws_exception_with_reason_and_cause() throws Exception {
+ exception.expect(IllegalStateException.class);
+ exception.expectMessage("illegal state happened");
+ exception.expectCause(instanceOf(IOException.class));
+ ErrorUtils.RuntimeErrors.ILLEGAL_STATE.throwRuntime("illegal state happened", new IOException("bad io"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
index d424004..be8d9bd 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.metron.common.configuration.Configuration;
+import org.apache.metron.common.utils.ErrorUtils;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
@@ -31,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -88,6 +88,8 @@ public class ElasticsearchDataPrunerRunner {
String resourceFile = cmd.getOptionValue("c");
configuration = new Configuration(Paths.get(resourceFile));
+ } else {
+ ErrorUtils.RuntimeErrors.ILLEGAL_ARG.throwRuntime("Unable to finish setting up configuration - z or c option is required.");
}
configuration.update();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
index be571e1..6037423 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java
@@ -42,6 +42,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.dataloads.extractor.Extractor;
import org.apache.metron.enrichment.converter.EnrichmentConverter;
import org.apache.metron.enrichment.converter.EnrichmentKey;
@@ -52,6 +53,7 @@ import org.mitre.taxii.messages.xml11.*;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import javax.xml.XMLConstants;
import javax.xml.bind.JAXBException;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
@@ -61,7 +63,8 @@ import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
-import java.io.*;
+import java.io.IOException;
+import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -69,338 +72,321 @@ import java.text.SimpleDateFormat;
import java.util.*;
public class TaxiiHandler extends TimerTask {
- private static final Logger LOG = Logger.getLogger(TaxiiHandler.class);
+ private static final Logger LOG = Logger.getLogger(TaxiiHandler.class);
- private static ThreadLocal<TaxiiXmlFactory> xmlFactory = new ThreadLocal<TaxiiXmlFactory>() {
- @Override
- protected TaxiiXmlFactory initialValue() {
- return new TaxiiXmlFactory();
- }
- };
- private static ThreadLocal<ObjectFactory> messageFactory = new ThreadLocal<ObjectFactory>() {
- @Override
- protected ObjectFactory initialValue() {
- return new ObjectFactory();
- }
- };
-
- private HttpClient taxiiClient;
- private URL endpoint;
- private Extractor extractor;
- private String hbaseTable;
- private String columnFamily;
- private Map<String, HTableInterface> connectionCache = new HashMap<>();
- private HttpClientContext context;
- private String collection;
- private String subscriptionId;
- private EnrichmentConverter converter = new EnrichmentConverter();
- private Date beginTime;
- private Configuration config;
- private boolean inProgress = false;
- private Set<String> allowedIndicatorTypes;
- public TaxiiHandler( TaxiiConnectionConfig connectionConfig
- , Extractor extractor
- , Configuration config
- ) throws Exception
- {
- LOG.info("Loading configuration: " + connectionConfig);
- this.allowedIndicatorTypes = connectionConfig.getAllowedIndicatorTypes();
- this.extractor = extractor;
- this.collection = connectionConfig.getCollection();
- this.subscriptionId = connectionConfig.getSubscriptionId();
- hbaseTable = connectionConfig.getTable();
- columnFamily = connectionConfig.getColumnFamily();
- this.beginTime = connectionConfig.getBeginTime();
- this.config = config;
- initializeClient(connectionConfig);
- LOG.info("Configured, starting polling " + endpoint + " for " + collection);
+ private static ThreadLocal<TaxiiXmlFactory> xmlFactory = new ThreadLocal<TaxiiXmlFactory>() {
+ @Override
+ protected TaxiiXmlFactory initialValue() {
+ return new TaxiiXmlFactory();
}
+ };
+ private static ThreadLocal<ObjectFactory> messageFactory = new ThreadLocal<ObjectFactory>() {
+ @Override
+ protected ObjectFactory initialValue() {
+ return new ObjectFactory();
+ }
+ };
- protected synchronized HTableInterface getTable(String table) throws IOException {
- HTableInterface ret = connectionCache.get(table);
- if(ret == null) {
- ret = createHTable(table);
- connectionCache.put(table, ret);
- }
- return ret;
+ private HttpClient taxiiClient;
+ private URL endpoint;
+ private Extractor extractor;
+ private String hbaseTable;
+ private String columnFamily;
+ private Map<String, HTableInterface> connectionCache = new HashMap<>();
+ private HttpClientContext context;
+ private String collection;
+ private String subscriptionId;
+ private EnrichmentConverter converter = new EnrichmentConverter();
+ private Date beginTime;
+ private Configuration config;
+ private boolean inProgress = false;
+ private Set<String> allowedIndicatorTypes;
+ public TaxiiHandler( TaxiiConnectionConfig connectionConfig
+ , Extractor extractor
+ , Configuration config
+ ) throws Exception
+ {
+ LOG.info("Loading configuration: " + connectionConfig);
+ this.allowedIndicatorTypes = connectionConfig.getAllowedIndicatorTypes();
+ this.extractor = extractor;
+ this.collection = connectionConfig.getCollection();
+ this.subscriptionId = connectionConfig.getSubscriptionId();
+ hbaseTable = connectionConfig.getTable();
+ columnFamily = connectionConfig.getColumnFamily();
+ this.beginTime = connectionConfig.getBeginTime();
+ this.config = config;
+ initializeClient(connectionConfig);
+ LOG.info("Configured, starting polling " + endpoint + " for " + collection);
+ }
+
+ protected synchronized HTableInterface getTable(String table) throws IOException {
+ HTableInterface ret = connectionCache.get(table);
+ if(ret == null) {
+ ret = createHTable(table);
+ connectionCache.put(table, ret);
}
+ return ret;
+ }
- protected synchronized HTableInterface createHTable(String tableInfo) throws IOException {
- return new HTable(config, tableInfo);
+ protected synchronized HTableInterface createHTable(String tableInfo) throws IOException {
+ return new HTable(config, tableInfo);
+ }
+ /**
+ * The action to be performed by this timer task.
+ */
+ @Override
+ public void run() {
+ if(inProgress) {
+ return;
}
- /**
- * The action to be performed by this timer task.
- */
- @Override
- public void run() {
- if(inProgress) {
- return;
- }
- Date ts = new Date();
- LOG.info("Polling..." + new SimpleDateFormat().format(ts));
+ Date ts = new Date();
+ LOG.info("Polling..." + new SimpleDateFormat().format(ts));
+ try {
+ inProgress = true;
+ // Prepare the message to send.
+ String sessionID = MessageHelper.generateMessageId();
+ PollRequest request = messageFactory.get().createPollRequest()
+ .withMessageId(sessionID)
+ .withCollectionName(collection);
+ if (subscriptionId != null) {
+ request = request.withSubscriptionID(subscriptionId);
+ } else {
+ request = request.withPollParameters(messageFactory.get().createPollParametersType());
+ }
+ if (beginTime != null) {
+ Calendar gc = GregorianCalendar.getInstance();
+ gc.setTime(beginTime);
+ XMLGregorianCalendar gTime = null;
try {
- inProgress = true;
- // Prepare the message to send.
- String sessionID = MessageHelper.generateMessageId();
- PollRequest request = messageFactory.get().createPollRequest()
- .withMessageId(sessionID)
- .withCollectionName(collection);
- if (subscriptionId != null) {
- request = request.withSubscriptionID(subscriptionId);
- } else {
- request = request.withPollParameters(messageFactory.get().createPollParametersType());
- }
- if (beginTime != null) {
- Calendar gc = GregorianCalendar.getInstance();
- gc.setTime(beginTime);
- XMLGregorianCalendar gTime = null;
- try {
- gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
- } catch (DatatypeConfigurationException e) {
- LOG.error("Unable to set the begin time", e);
- }
- gTime.setFractionalSecond(null);
- LOG.info("Begin Time: " + gTime);
- request.setExclusiveBeginTimestamp(gTime);
- }
+ gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
+ } catch (DatatypeConfigurationException e) {
+ ErrorUtils.RuntimeErrors.ILLEGAL_STATE.throwRuntime("Unable to set the begin time due to", e);
+ }
+ gTime.setFractionalSecond(null);
+ LOG.info("Begin Time: " + gTime);
+ request.setExclusiveBeginTimestamp(gTime);
+ }
- try {
- PollResponse response = call(request, PollResponse.class);
- LOG.info("Got Poll Response with " + response.getContentBlocks().size() + " blocks");
- int numProcessed = 0;
- long avgTimeMS = 0;
- long timeStartedBlock = System.currentTimeMillis();
- for (ContentBlock block : response.getContentBlocks()) {
- AnyMixedContentType content = block.getContent();
- for (Object o : content.getContent()) {
- numProcessed++;
- long timeS = System.currentTimeMillis();
- String xml = null;
- if (o instanceof Element) {
- Element element = (Element) o;
- xml = getStringFromDocument(element.getOwnerDocument());
- if(LOG.isDebugEnabled() && Math.random() < 0.01) {
- LOG.debug("Random Stix doc: " + xml);
- }
- for (LookupKV<EnrichmentKey, EnrichmentValue> kv : extractor.extract(xml)) {
- if(allowedIndicatorTypes.isEmpty()
- || allowedIndicatorTypes.contains(kv.getKey().type)
- )
- {
- kv.getValue().getMetadata().put("source_type", "taxii");
- kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
- kv.getValue().getMetadata().put("taxii_collection", collection);
- Put p = converter.toPut(columnFamily, kv.getKey(), kv.getValue());
- HTableInterface table = getTable(hbaseTable);
- table.put(p);
- LOG.info("Found Threat Intel: " + kv.getKey() + " => " + kv.getValue());
- }
- }
- }
- avgTimeMS += System.currentTimeMillis() - timeS;
- }
- if( (numProcessed + 1) % 100 == 0) {
- LOG.info("Processed " + numProcessed + " in " + (System.currentTimeMillis() - timeStartedBlock) + " ms, avg time: " + avgTimeMS / content.getContent().size());
- timeStartedBlock = System.currentTimeMillis();
- avgTimeMS = 0;
- numProcessed = 0;
- }
+ try {
+ PollResponse response = call(request, PollResponse.class);
+ LOG.info("Got Poll Response with " + response.getContentBlocks().size() + " blocks");
+ int numProcessed = 0;
+ long avgTimeMS = 0;
+ long timeStartedBlock = System.currentTimeMillis();
+ for (ContentBlock block : response.getContentBlocks()) {
+ AnyMixedContentType content = block.getContent();
+ for (Object o : content.getContent()) {
+ numProcessed++;
+ long timeS = System.currentTimeMillis();
+ String xml = null;
+ if (o instanceof Element) {
+ Element element = (Element) o;
+ xml = getStringFromDocument(element.getOwnerDocument());
+ if(LOG.isDebugEnabled() && Math.random() < 0.01) {
+ LOG.debug("Random Stix doc: " + xml);
+ }
+ for (LookupKV<EnrichmentKey, EnrichmentValue> kv : extractor.extract(xml)) {
+ if(allowedIndicatorTypes.isEmpty()
+ || allowedIndicatorTypes.contains(kv.getKey().type)
+ )
+ {
+ kv.getValue().getMetadata().put("source_type", "taxii");
+ kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
+ kv.getValue().getMetadata().put("taxii_collection", collection);
+ Put p = converter.toPut(columnFamily, kv.getKey(), kv.getValue());
+ HTableInterface table = getTable(hbaseTable);
+ table.put(p);
+ LOG.info("Found Threat Intel: " + kv.getKey() + " => " + kv.getValue());
}
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- throw new RuntimeException("Unable to make request", e);
+ }
}
+ avgTimeMS += System.currentTimeMillis() - timeS;
+ }
+ if( (numProcessed + 1) % 100 == 0) {
+ LOG.info("Processed " + numProcessed + " in " + (System.currentTimeMillis() - timeStartedBlock) + " ms, avg time: " + avgTimeMS / content.getContent().size());
+ timeStartedBlock = System.currentTimeMillis();
+ avgTimeMS = 0;
+ numProcessed = 0;
+ }
}
- finally {
- inProgress = false;
- beginTime = ts;
- }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException("Unable to make request", e);
+ }
+ }
+ finally {
+ inProgress = false;
+ beginTime = ts;
}
- public String getStringFromDocument(Document doc)
+ }
+ public String getStringFromDocument(Document doc)
+ {
+ try
{
- try
- {
- DOMSource domSource = new DOMSource(doc);
- StringWriter writer = new StringWriter();
- StreamResult result = new StreamResult(writer);
- TransformerFactory tf = TransformerFactory.newInstance();
- Transformer transformer = tf.newTransformer();
- transformer.transform(domSource, result);
- return writer.toString();
- }
- catch(TransformerException ex)
- {
- ex.printStackTrace();
- return null;
- }
+ DOMSource domSource = new DOMSource(doc);
+ StringWriter writer = new StringWriter();
+ StreamResult result = new StreamResult(writer);
+ TransformerFactory tf = TransformerFactory.newInstance();
+ tf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
+ Transformer transformer = tf.newTransformer();
+ transformer.transform(domSource, result);
+ return writer.toString();
}
- private <RESPONSE_T> RESPONSE_T call( Object request, Class<RESPONSE_T> responseClazz) throws URISyntaxException, JAXBException, IOException {
- return call(taxiiClient, endpoint.toURI(), request, context, responseClazz);
+ catch(TransformerException ex)
+ {
+ ex.printStackTrace();
+ return null;
}
+ }
+ private <RESPONSE_T> RESPONSE_T call( Object request, Class<RESPONSE_T> responseClazz) throws URISyntaxException, JAXBException, IOException {
+ return call(taxiiClient, endpoint.toURI(), request, context, responseClazz);
+ }
- private void initializeClient(TaxiiConnectionConfig config) throws Exception {
- LOG.info("Initializing client..");
- if(context == null) {
- context = createContext(config.getEndpoint(), config.getUsername(), config.getPassword(), config.getPort());
- }
- URL endpoint = config.getEndpoint();
- if(config.getType() == ConnectionType.DISCOVER) {
- LOG.info("Discovering endpoint");
- endpoint = discoverPollingClient(config.getProxy(), endpoint, config.getUsername(), config.getPassword(), context, collection).pollEndpoint;
- this.endpoint = endpoint;
- LOG.info("Discovered endpoint as " + endpoint);
- }
- taxiiClient = buildClient(config.getProxy(), config.getUsername(), config.getPassword());
+ private void initializeClient(TaxiiConnectionConfig config) throws Exception {
+ LOG.info("Initializing client..");
+ if(context == null) {
+ context = createContext(config.getEndpoint(), config.getUsername(), config.getPassword(), config.getPort());
}
-
- private static class DiscoveryResults {
- URL pollEndpoint;
- URL collectionManagementEndpoint;
- List<String> collections = new ArrayList<>();
+ URL endpoint = config.getEndpoint();
+ if(config.getType() == ConnectionType.DISCOVER) {
+ LOG.info("Discovering endpoint");
+ endpoint = discoverPollingClient(config.getProxy(), endpoint, config.getUsername(), config.getPassword(), context, collection).pollEndpoint;
+ this.endpoint = endpoint;
+ LOG.info("Discovered endpoint as " + endpoint);
}
- private static DiscoveryResults discoverPollingClient(URL proxy, URL endpoint, String username, String password, HttpClientContext context, String defaultCollection) throws Exception {
+ taxiiClient = buildClient(config.getProxy(), config.getUsername(), config.getPassword());
+ }
- DiscoveryResults results = new DiscoveryResults();
- {
- HttpClient discoverClient = buildClient(proxy, username, password);
- String sessionID = MessageHelper.generateMessageId();
- // Prepare the message to send.
- DiscoveryRequest request = messageFactory.get().createDiscoveryRequest()
- .withMessageId(sessionID);
- DiscoveryResponse response = call(discoverClient, endpoint.toURI(), request, context, DiscoveryResponse.class);
- for (ServiceInstanceType serviceInstance : response.getServiceInstances()) {
- if (serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.POLL) {
- results.pollEndpoint = new URL(serviceInstance.getAddress());
- }
- else if(serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.COLLECTION_MANAGEMENT) {
- results.collectionManagementEndpoint= new URL(serviceInstance.getAddress());
- }
- }
- if (results.pollEndpoint == null) {
- throw new RuntimeException("Unable to discover a poll TAXII feed");
- }
+ private static class DiscoveryResults {
+ URL pollEndpoint;
+ URL collectionManagementEndpoint;
+ List<String> collections = new ArrayList<>();
+ }
+ private static DiscoveryResults discoverPollingClient(URL proxy, URL endpoint, String username, String password, HttpClientContext context, String defaultCollection) throws Exception {
+
+ DiscoveryResults results = new DiscoveryResults();
+ {
+ HttpClient discoverClient = buildClient(proxy, username, password);
+ String sessionID = MessageHelper.generateMessageId();
+ // Prepare the message to send.
+ DiscoveryRequest request = messageFactory.get().createDiscoveryRequest()
+ .withMessageId(sessionID);
+ DiscoveryResponse response = call(discoverClient, endpoint.toURI(), request, context, DiscoveryResponse.class);
+ for (ServiceInstanceType serviceInstance : response.getServiceInstances()) {
+ if (serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.POLL) {
+ results.pollEndpoint = new URL(serviceInstance.getAddress());
}
- if(defaultCollection == null)
- //get collections
- {
- HttpClient discoverClient = buildClient(proxy, username, password);
- String sessionID = MessageHelper.generateMessageId();
- CollectionInformationRequest request = messageFactory.get().createCollectionInformationRequest()
- .withMessageId(sessionID);
- CollectionInformationResponse response = call(discoverClient, results.collectionManagementEndpoint.toURI(), request, context, CollectionInformationResponse.class);
- LOG.info("Unable to find the default collection; available collections are:");
- for(CollectionRecordType c : response.getCollections()) {
- LOG.info(c.getCollectionName());
- results.collections.add(c.getCollectionName());
- }
- System.exit(0);
+ else if(serviceInstance.isAvailable() && serviceInstance.getServiceType() == ServiceTypeEnum.COLLECTION_MANAGEMENT) {
+ results.collectionManagementEndpoint= new URL(serviceInstance.getAddress());
}
- return results;
+ }
+ if (results.pollEndpoint == null) {
+ throw new RuntimeException("Unable to discover a poll TAXII feed");
+ }
}
+ if(defaultCollection == null)
+ //get collections
+ {
+ HttpClient discoverClient = buildClient(proxy, username, password);
+ String sessionID = MessageHelper.generateMessageId();
+ CollectionInformationRequest request = messageFactory.get().createCollectionInformationRequest()
+ .withMessageId(sessionID);
+ CollectionInformationResponse response = call(discoverClient, results.collectionManagementEndpoint.toURI(), request, context, CollectionInformationResponse.class);
+ LOG.info("Unable to find the default collection; available collections are:");
+ for(CollectionRecordType c : response.getCollections()) {
+ LOG.info(c.getCollectionName());
+ results.collections.add(c.getCollectionName());
+ }
+ System.exit(0);
+ }
+ return results;
+ }
- private static HttpClientContext createContext(URL endpoint, String username, String password, int port) {
- HttpClientContext context = null;
- HttpHost target = new HttpHost(endpoint.getHost(), port, endpoint.getProtocol());
- if (username != null && password != null) {
+ private static HttpClientContext createContext(URL endpoint, String username, String password, int port) {
+ HttpClientContext context = null;
+ HttpHost target = new HttpHost(endpoint.getHost(), port, endpoint.getProtocol());
+ if (username != null && password != null) {
- CredentialsProvider credsProvider = new BasicCredentialsProvider();
- credsProvider.setCredentials(
- new AuthScope(target.getHostName(), target.getPort()),
- new UsernamePasswordCredentials(username, password));
+ CredentialsProvider credsProvider = new BasicCredentialsProvider();
+ credsProvider.setCredentials(
+ new AuthScope(target.getHostName(), target.getPort()),
+ new UsernamePasswordCredentials(username, password));
- // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/authentication.html
- AuthCache authCache = new BasicAuthCache();
- authCache.put(target, new BasicScheme());
+ // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/authentication.html
+ AuthCache authCache = new BasicAuthCache();
+ authCache.put(target, new BasicScheme());
- // Add AuthCache to the execution context
- context = HttpClientContext.create();
- context.setCredentialsProvider(credsProvider);
- context.setAuthCache(authCache);
- } else {
- context = null;
- }
- return context;
+ // Add AuthCache to the execution context
+ context = HttpClientContext.create();
+ context.setCredentialsProvider(credsProvider);
+ context.setAuthCache(authCache);
+ } else {
+ context = null;
}
+ return context;
+ }
- public static <RESPONSE_T, REQUEST_T> RESPONSE_T call( HttpClient taxiiClient
- , URI endpoint
- , REQUEST_T request
- , HttpClientContext context
- , Class<RESPONSE_T> responseClazz
- ) throws JAXBException, IOException {
- //TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
- //String req = taxiiXml.marshalToString(request, true);
- // Call the service
- Object responseObj = taxiiClient.callTaxiiService(endpoint, request, context);
- LOG.info("Request made : " + request.getClass().getCanonicalName() + " => " + responseObj.getClass().getCanonicalName() + " (expected " + responseClazz.getCanonicalName() + ")");
- //String resp = taxiiXml.marshalToString(responseObj, true);
- try {
- return responseClazz.cast(responseObj);
- }
- catch(ClassCastException cce) {
- TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
- String resp = taxiiXml.marshalToString(responseObj, true);
- String msg = "Didn't return the response we expected: " + responseObj.getClass() + " \n" + resp;
- LOG.error(msg, cce);
- throw new RuntimeException(msg, cce);
- }
+ public static <RESPONSE_T, REQUEST_T> RESPONSE_T call( HttpClient taxiiClient
+ , URI endpoint
+ , REQUEST_T request
+ , HttpClientContext context
+ , Class<RESPONSE_T> responseClazz
+ ) throws JAXBException, IOException {
+ Object responseObj = taxiiClient.callTaxiiService(endpoint, request, context);
+ LOG.info("Request made : " + request.getClass().getCanonicalName() + " => " + responseObj.getClass().getCanonicalName() + " (expected " + responseClazz.getCanonicalName() + ")");
+ try {
+ return responseClazz.cast(responseObj);
}
- private static HttpClient buildClient(URL proxy, String username, String password) throws Exception
- {
- HttpClient client = new HttpClient(); // Start with a default TAXII HTTP client.
+ catch(ClassCastException cce) {
+ TaxiiXml taxiiXml = xmlFactory.get().createTaxiiXml();
+ String resp = taxiiXml.marshalToString(responseObj, true);
+ String msg = "Didn't return the response we expected: " + responseObj.getClass() + " \n" + resp;
+ LOG.error(msg, cce);
+ throw new RuntimeException(msg, cce);
+ }
+ }
+ private static HttpClient buildClient(URL proxy, String username, String password) throws Exception
+ {
+ HttpClient client = new HttpClient(); // Start with a default TAXII HTTP client.
- // Create an Apache HttpClientBuilder to be customized by the command line arguments.
- HttpClientBuilder builder = HttpClientBuilder.create().useSystemProperties();
+ // Create an Apache HttpClientBuilder to be customized by the command line arguments.
+ HttpClientBuilder builder = HttpClientBuilder.create().useSystemProperties();
- // Proxy
- if (proxy != null) {
- HttpHost proxyHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getProtocol());
- builder.setProxy(proxyHost);
- }
+ // Proxy
+ if (proxy != null) {
+ HttpHost proxyHost = new HttpHost(proxy.getHost(), proxy.getPort(), proxy.getProtocol());
+ builder.setProxy(proxyHost);
+ }
- // Basic authentication. User & Password
- if (username != null ^ password != null) {
- throw new Exception("'username' and 'password' arguments are required to appear together.");
- }
+ // Basic authentication. User & Password
+ if (username != null ^ password != null) {
+ throw new Exception("'username' and 'password' arguments are required to appear together.");
+ }
- // from: http://stackoverflow.com/questions/19517538/ignoring-ssl-certificate-in-apache-httpclient-4-3
- SSLContextBuilder ssbldr = new SSLContextBuilder();
- ssbldr.loadTrustMaterial(null, new TrustSelfSignedStrategy());
- SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(ssbldr.build(),SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
+ // from: http://stackoverflow.com/questions/19517538/ignoring-ssl-certificate-in-apache-httpclient-4-3
+ SSLContextBuilder ssbldr = new SSLContextBuilder();
+ ssbldr.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+ SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(ssbldr.build(),SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
- Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
- .register("http", new PlainConnectionSocketFactory())
- .register("https", sslsf)
- .build();
+ Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+ .register("http", new PlainConnectionSocketFactory())
+ .register("https", sslsf)
+ .build();
- PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
- cm.setMaxTotal(20);//max connection
+ PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
+ cm.setMaxTotal(20);//max connection
- System.setProperty("jsse.enableSNIExtension", "false"); //""
- CloseableHttpClient httpClient = builder
- .setSSLSocketFactory(sslsf)
- .setConnectionManager(cm)
- .build();
+ System.setProperty("jsse.enableSNIExtension", "false"); //""
+ CloseableHttpClient httpClient = builder
+ .setSSLSocketFactory(sslsf)
+ .setConnectionManager(cm)
+ .build();
- client.setHttpclient(httpClient);
- return client;
- }
- public static void main(String... argv) throws Exception {
- URL endpoint = new URL("http://hailataxii.com/taxii-discovery-service");
- String username = "guest";
- String password = "guest";
- TaxiiConnectionConfig config = new TaxiiConnectionConfig();
- config = config.withConnectionType(ConnectionType.DISCOVER)
- .withEndpoint(endpoint)
- .withUsername(username)
- .withCollection("guest.Abuse_ch")
- .withPassword(password);
- //TaxiiHandler handler = new TaxiiHandler(config, null);
- //handler.run();
- //discoverPollingClient(null, endpoint, username, password, context);
- }
+ client.setHttpclient(httpClient);
+ return client;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
index 763ba59..3c38824 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/lookup/accesstracker/BloomAccessTracker.java
@@ -37,12 +37,15 @@ public class BloomAccessTracker implements AccessTracker {
primitiveSink.putBytes(lookupKey.toBytes());
}
-
@Override
public boolean equals(Object obj) {
return this.getClass().equals(obj.getClass());
}
+ @Override
+ public int hashCode() {
+ return super.hashCode() * 31;
+ }
}
private static Funnel<LookupKey> LOOKUPKEY_FUNNEL = new LookupKeyFunnel();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/925469b8/metron-platform/metron-test-utilities/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml
index a4eac92..59649e6 100644
--- a/metron-platform/metron-test-utilities/pom.xml
+++ b/metron-platform/metron-test-utilities/pom.xml
@@ -89,8 +89,13 @@
<version>2.7.1</version>
</dependency>
<dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
<version>${global_mockito_version}</version>
</dependency>
<dependency>
@@ -103,11 +108,6 @@
<artifactId>multiline-string</artifactId>
<version>0.1.2</version>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- </dependency>
</dependencies>
<build>