You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:06:02 UTC
[20/43] incubator-metron git commit: METRON-56 Create unified
enrichment topology (merrimanr via cestella) closes
apache/incubator-metron#33
METRON-56 Create unified enrichment topology (merrimanr via cestella) closes apache/incubator-metron#33
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/9f96399d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/9f96399d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/9f96399d
Branch: refs/heads/Metron_0.1BETA
Commit: 9f96399d9ecb252da13edf7bc44a366740945e85
Parents: 0e1055a
Author: merrimanr <me...@gmail.com>
Authored: Tue Mar 1 13:20:13 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 1 13:20:13 2016 -0500
----------------------------------------------------------------------
.../metron/alerts/TelemetryAlertsBolt.java | 4 +-
metron-streaming/Metron-Common/pom.xml | 15 +
.../main/java/org/apache/metron/Constants.java | 27 +
.../metron/bolt/BulkMessageWriterBolt.java | 102 +
.../org/apache/metron/bolt/ConfiguredBolt.java | 91 +
.../java/org/apache/metron/bolt/JoinBolt.java | 39 +-
.../java/org/apache/metron/bolt/SplitBolt.java | 46 +-
.../org/apache/metron/domain/Enrichment.java | 21 +-
.../org/apache/metron/domain/SourceConfig.java | 88 +
.../metron/enrichment/EnrichmentConstants.java | 28 +
.../enrichment/EnrichmentSplitterBolt.java | 129 -
.../java/org/apache/metron/hbase/HBaseBolt.java | 5 +-
.../org/apache/metron/hbase/HTableProvider.java | 3 -
.../metron/helpers/topology/ErrorGenerator.java | 54 -
.../metron/helpers/topology/ErrorUtils.java | 64 +
.../metron/spout/pcap/HDFSWriterCallback.java | 169 ++
.../metron/spout/pcap/HDFSWriterConfig.java | 97 +
.../apache/metron/topology/TopologyUtils.java | 28 +
.../org/apache/metron/utils/ConfigUtils.java | 48 +
.../org/apache/metron/writer/HBaseWriter.java | 88 +
.../org/apache/metron/writer/PcapWriter.java | 52 +
.../writer/interfaces/BulkMessageWriter.java | 30 +
.../metron/writer/interfaces/MessageWriter.java | 27 +
.../src/main/java/storm/kafka/Callback.java | 26 +
.../java/storm/kafka/CallbackCollector.java | 182 ++
.../java/storm/kafka/CallbackKafkaSpout.java | 93 +
.../src/main/java/storm/kafka/EmitContext.java | 146 +
.../resources/config/source/bro-config.json | 13 +
.../resources/config/source/pcap-config.json | 13 +
.../resources/config/source/snort-config.json | 13 +
.../resources/config/source/yaf-config.json | 13 +
.../adapters/host/HostFromJSONListAdapter.java | 7 +-
.../enrichment/bolt/EnrichmentJoinBolt.java | 43 +-
.../enrichment/bolt/EnrichmentSplitterBolt.java | 140 +
.../enrichment/bolt/GenericEnrichmentBolt.java | 67 +-
.../enrichment/bolt/ThreatIntelJoinBolt.java | 41 +
.../bolt/ThreatIntelSplitterBolt.java | 40 +
.../enrichment/utils/EnrichmentUtils.java | 32 +
.../enrichment/utils/ThreatIntelUtils.java | 32 +
.../metron/indexing/AbstractIndexingBolt.java | 7 +-
.../metron/indexing/TelemetryIndexingBolt.java | 23 +-
.../adapters/ESTimedRotatingAdapter.java | 3 +-
.../metron/writer/ElasticSearchWriter.java | 95 +
.../org/apache/metron/writer/HdfsWriter.java | 44 +
metron-streaming/Metron-MessageParsers/pom.xml | 13 +
.../java/org/apache/metron/bolt/ParserBolt.java | 88 +
.../org/apache/metron/bolt/PcapParserBolt.java | 10 +-
.../apache/metron/bolt/TelemetryParserBolt.java | 12 +-
.../org/apache/metron/parser/MessageParser.java | 25 +
.../metron/parsing/parsers/GrokParser.java | 146 +
.../metron/parsing/parsers/PcapParser.java | 23 +-
.../org/apache/metron/writer/KafkaWriter.java | 79 +
.../src/main/resources/patterns/common | 96 +
.../src/main/resources/patterns/yaf | 113 +-
metron-streaming/Metron-Testing/pom.xml | 28 +-
.../util/integration/ComponentRunner.java | 15 +-
.../components/ElasticSearchComponent.java | 8 +-
.../components/KafkaWithZKComponent.java | 228 ++
.../util/integration/util/KafkaUtil.java | 41 +
.../org/apache/metron/utils/KafkaLoader.java | 88 +
.../apache/metron/utils/SourceConfigUtils.java | 95 +
.../Metron_Configs/topologies/asa/local.yaml | 401 ---
.../Metron_Configs/topologies/asa/remote.yaml | 385 +--
.../Metron_Configs/topologies/asa/test.yaml | 82 +
.../Metron_Configs/topologies/bro/local.yaml | 192 --
.../Metron_Configs/topologies/bro/remote.yaml | 176 +-
.../Metron_Configs/topologies/bro/test.yaml | 82 +
.../topologies/enrichment/remote.yaml | 331 +++
.../topologies/enrichment/test.yaml | 314 ++
.../topologies/fireeye/local.yaml | 401 ---
.../topologies/fireeye/remote.yaml | 382 +--
.../Metron_Configs/topologies/fireeye/test.yaml | 79 +
.../Metron_Configs/topologies/ise/local.yaml | 192 --
.../Metron_Configs/topologies/ise/remote.yaml | 177 +-
.../Metron_Configs/topologies/ise/test.yaml | 79 +
.../topologies/lancope/local.yaml | 401 ---
.../topologies/lancope/remote.yaml | 382 +--
.../Metron_Configs/topologies/lancope/test.yaml | 79 +
.../topologies/paloalto/local.yaml | 172 --
.../topologies/paloalto/remote.yaml | 155 +-
.../topologies/paloalto/test.yaml | 79 +
.../Metron_Configs/topologies/pcap/local.yaml | 22 +-
.../Metron_Configs/topologies/pcap/parse.yaml | 70 +
.../Metron_Configs/topologies/pcap/remote.yaml | 2 +-
.../Metron_Configs/topologies/snort/local.yaml | 195 --
.../Metron_Configs/topologies/snort/remote.yaml | 175 +-
.../Metron_Configs/topologies/snort/test.yaml | 79 +
.../topologies/sourcefire/local.yaml | 401 ---
.../topologies/sourcefire/remote.yaml | 382 +--
.../topologies/sourcefire/test.yaml | 79 +
.../Metron_Configs/topologies/yaf/local.yaml | 192 --
.../Metron_Configs/topologies/yaf/remote.yaml | 185 +-
.../Metron_Configs/topologies/yaf/test.yaml | 95 +
.../src/main/resources/SampleIndexed/YafIndexed | 10 +
.../src/main/resources/SampleInput/SnortOutput | 4 +-
.../main/resources/SampleInput/YafExampleOutput | 2695 +-----------------
.../src/main/resources/SampleParsed/SnortParsed | 3 +
.../resources/SampleParsed/YafExampleParsed | 10 +
.../integration/EnrichmentIntegrationTest.java | 195 ++
.../integration/ParserIntegrationTest.java | 155 +
.../integration/SnortIntegrationTest.java | 62 +
.../metron/integration/YafIntegrationTest.java | 62 +
.../integration/pcap/PcapIntegrationTest.java | 279 --
.../metron/integration/util/TestUtils.java | 38 +
pom.xml | 2 +-
105 files changed, 5213 insertions(+), 8156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
index fd898e3..663ae40 100644
--- a/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
+++ b/metron-streaming/Metron-Alerts/src/main/java/org/apache/metron/alerts/TelemetryAlertsBolt.java
@@ -34,7 +34,7 @@ import backtype.storm.tuple.Values;
import com.google.common.cache.CacheBuilder;
import org.apache.metron.alerts.interfaces.AlertsAdapter;
-import org.apache.metron.helpers.topology.ErrorGenerator;
+import org.apache.metron.helpers.topology.ErrorUtils;
import org.apache.metron.json.serialization.JSONEncoderHelper;
import org.apache.metron.metrics.MetricReporter;
@@ -245,7 +245,7 @@ public class TelemetryAlertsBolt extends AbstractAlertBolt {
*/
- JSONObject error = ErrorGenerator.generateErrorMessage(
+ JSONObject error = ErrorUtils.generateErrorMessage(
"Alerts problem: " + original_message, e);
_collector.emit("error", new Values(error));
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/pom.xml b/metron-streaming/Metron-Common/pom.xml
index 57a58d7..c4fc5aa 100644
--- a/metron-streaming/Metron-Common/pom.xml
+++ b/metron-streaming/Metron-Common/pom.xml
@@ -161,6 +161,21 @@
<artifactId>json-schema-validator</artifactId>
<version>${global_json_schema_validator_version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>flux-core</artifactId>
+ <version>${global_flux_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ <version>${global_storm_version}</version>
+ </dependency>
</dependencies>
<reporting>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java
new file mode 100644
index 0000000..c6eafe9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/Constants.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron;
+
+public class Constants {
+
+ public static final String ZOOKEEPER_ROOT = "/metron";
+ public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology";
+ public static final String SOURCE_TYPE = "source.type";
+ public static final String ENRICHMENT_TOPIC = "enrichments";
+ public static final String ERROR_STREAM = "error";
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
new file mode 100644
index 0000000..6d094ee
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.helpers.topology.ErrorUtils;
+import org.apache.metron.topology.TopologyUtils;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class BulkMessageWriterBolt extends ConfiguredBolt {
+
+ int count = 0;
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BulkMessageWriterBolt.class);
+ private OutputCollector collector;
+ private BulkMessageWriter<JSONObject> bulkMessageWriter;
+ private Map<String, List<Tuple>> sourceTupleMap = new HashMap<>();
+ private Map<String, List<JSONObject>> sourceMessageMap = new HashMap<>();
+
+
+ public BulkMessageWriterBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject> bulkMessageWriter) {
+ this.bulkMessageWriter = bulkMessageWriter;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ super.prepare(stormConf, context, collector);
+ bulkMessageWriter.init();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(Tuple tuple) {
+ JSONObject message = (JSONObject) tuple.getValueByField("message");
+ String sourceType = TopologyUtils.getSourceType(message);
+ SourceConfig configuration = configurations.get(sourceType);
+ int batchSize = configuration != null ? configuration.getBatchSize() : 1;
+ List<Tuple> tupleList = sourceTupleMap.get(sourceType);
+ if (tupleList == null) tupleList = new ArrayList<>();
+ tupleList.add(tuple);
+ List<JSONObject> messageList = sourceMessageMap.get(sourceType);
+ if (messageList == null) messageList = new ArrayList<>();
+ messageList.add(message);
+ if (messageList.size() < batchSize) {
+ sourceTupleMap.put(sourceType, tupleList);
+ sourceMessageMap.put(sourceType, messageList);
+ } else {
+ try {
+ bulkMessageWriter.write(sourceType, configuration, tupleList, messageList);
+ for(Tuple t: tupleList) {
+ collector.ack(t);
+ }
+ } catch (Exception e) {
+ for(Tuple t: tupleList) {
+ collector.fail(t);
+ }
+ ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+ }
+ sourceTupleMap.remove(sourceType);
+ sourceMessageMap.remove(sourceType);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("error", new Fields("message"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
new file mode 100644
index 0000000..30c8e23
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/ConfiguredBolt.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.log4j.Logger;
+import org.apache.metron.Constants;
+import org.apache.metron.domain.SourceConfig;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class ConfiguredBolt extends BaseRichBolt {
+
+ private static final Logger LOG = Logger.getLogger(ConfiguredBolt.class);
+
+ private String zookeeperUrl;
+
+ protected Map<String, SourceConfig> configurations = Collections.synchronizedMap(new HashMap<String, SourceConfig>());
+ private CuratorFramework client;
+ private PathChildrenCache cache;
+
+ public ConfiguredBolt(String zookeeperUrl) {
+ this.zookeeperUrl = zookeeperUrl;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ client = CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy);
+ client.start();
+ cache = new PathChildrenCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT, true);
+ PathChildrenCacheListener listener = new PathChildrenCacheListener() {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
+ byte[] data = event.getData().getData();
+ if (data != null) {
+ SourceConfig temp = SourceConfig.load(data);
+ if (temp != null) {
+ String[] path = event.getData().getPath().split("/");
+ configurations.put(path[path.length - 1], temp);
+ }
+ }
+ }
+ }
+ };
+ cache.getListenable().addListener(listener);
+ try {
+ cache.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ cache.close();
+ client.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
index dc84473..dac1c0a 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/JoinBolt.java
@@ -20,14 +20,12 @@ package org.apache.metron.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,18 +35,21 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-public abstract class JoinBolt<V> extends BaseRichBolt {
+public abstract class JoinBolt<V> extends ConfiguredBolt {
private static final Logger LOG = LoggerFactory
.getLogger(JoinBolt.class);
protected OutputCollector collector;
- protected ImmutableSet<String> streamIds;
protected transient CacheLoader<String, Map<String, V>> loader;
protected transient LoadingCache<String, Map<String, V>> cache;
protected Long maxCacheSize;
protected Long maxTimeRetain;
+ public JoinBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
public JoinBolt withMaxCacheSize(long maxCacheSize) {
this.maxCacheSize = maxCacheSize;
return this;
@@ -61,6 +62,7 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
+ super.prepare(map, topologyContext, outputCollector);
this.collector = outputCollector;
if (this.maxCacheSize == null)
throw new IllegalStateException("maxCacheSize must be specified");
@@ -74,9 +76,6 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
cache = CacheBuilder.newBuilder().maximumSize(maxCacheSize)
.expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
.build(loader);
- Set<String> temp = getStreamIds();
- temp.add("message");
- streamIds = ImmutableSet.copyOf(temp);
prepare(map, topologyContext);
}
@@ -85,26 +84,28 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
public void execute(Tuple tuple) {
String streamId = tuple.getSourceStreamId();
String key = (String) tuple.getValueByField("key");
- V value = (V) tuple.getValueByField("message");
+ V message = (V) tuple.getValueByField("message");
try {
- Map<String, V> streamValueMap = cache.get(key);
- if (streamValueMap.containsKey(streamId)) {
+ Map<String, V> streamMessageMap = cache.get(key);
+ if (streamMessageMap.containsKey(streamId)) {
LOG.warn(String.format("Received key %s twice for " +
"stream %s", key, streamId));
}
- streamValueMap.put(streamId, value);
- Set<String> streamValueKeys = streamValueMap.keySet();
- if (streamValueKeys.size() == streamIds.size() && Sets.symmetricDifference
- (streamValueKeys, streamIds)
+ streamMessageMap.put(streamId, message);
+ Set<String> streamIds = getStreamIds(message);
+ Set<String> streamMessageKeys = streamMessageMap.keySet();
+ if (streamMessageKeys.size() == streamIds.size() && Sets.symmetricDifference
+ (streamMessageKeys, streamIds)
.isEmpty()) {
- collector.emit("message", tuple, new Values(key, joinValues
- (streamValueMap)));
+ collector.emit("message", tuple, new Values(key, joinMessages
+ (streamMessageMap)));
collector.ack(tuple);
cache.invalidate(key);
} else {
- cache.put(key, streamValueMap);
+ cache.put(key, streamMessageMap);
}
} catch (ExecutionException e) {
+ collector.reportError(e);
LOG.error(e.getMessage(), e);
}
}
@@ -116,7 +117,7 @@ public abstract class JoinBolt<V> extends BaseRichBolt {
public abstract void prepare(Map map, TopologyContext topologyContext);
- public abstract Set<String> getStreamIds();
+ public abstract Set<String> getStreamIds(V value);
- public abstract V joinValues(Map<String, V> streamValueMap);
+ public abstract V joinMessages(Map<String, V> streamMessageMap);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
index d3d2cf3..89e13a4 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/SplitBolt.java
@@ -20,34 +20,33 @@ package org.apache.metron.bolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import com.google.common.collect.ImmutableSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
public abstract class SplitBolt<T> extends
- BaseRichBolt {
+ ConfiguredBolt {
protected OutputCollector collector;
- private Set<String> streamIds;
+
+ public SplitBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
@Override
public final void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
+ super.prepare(map, topologyContext, outputCollector);
collector = outputCollector;
- streamIds = ImmutableSet.copyOf(getStreamIds());
prepare(map, topologyContext);
}
@Override
public final void execute(Tuple tuple) {
- emit(tuple, generateMessages(tuple));
+ emit(tuple, generateMessage(tuple));
}
@Override
@@ -60,24 +59,23 @@ public abstract class SplitBolt<T> extends
declareOther(declarer);
}
- public void emit(Tuple tuple, List<T> messages) {
- for(T message: messages) {
- String key = getKey(tuple, message);
- collector.emit("message", tuple, new Values(key, message));
- Map<String, T> streamValueMap = splitMessage(message);
- for (String streamId : streamIds) {
- T streamValue = streamValueMap.get(streamId);
- if (streamValue == null) {
- streamValue = getDefaultValue(streamId);
- }
- collector.emit(streamId, new Values(key, streamValue));
+ public void emit(Tuple tuple, T message) {
+ if (message == null) return;
+ String key = getKey(tuple, message);
+ collector.emit("message", tuple, new Values(key, message));
+ Map<String, T> streamMessageMap = splitMessage(message);
+ for (String streamId : streamMessageMap.keySet()) {
+ T streamMessage = streamMessageMap.get(streamId);
+ if (streamMessage == null) {
+ streamMessage = getDefaultMessage(streamId);
}
- collector.ack(tuple);
+ collector.emit(streamId, new Values(key, streamMessage));
}
- emitOther(tuple, messages);
+ collector.ack(tuple);
+ emitOther(tuple, message);
}
- protected T getDefaultValue(String streamId) {
+ protected T getDefaultMessage(String streamId) {
throw new IllegalArgumentException("Could not find a message for" +
" stream: " + streamId);
}
@@ -88,13 +86,13 @@ public abstract class SplitBolt<T> extends
public abstract String getKey(Tuple tuple, T message);
- public abstract List<T> generateMessages(Tuple tuple);
+ public abstract T generateMessage(Tuple tuple);
public abstract Map<String, T> splitMessage(T message);
public abstract void declareOther(OutputFieldsDeclarer declarer);
- public abstract void emitOther(Tuple tuple, List<T> messages);
+ public abstract void emitOther(Tuple tuple, T message);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
index d75e9a3..7079d5c 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Enrichment.java
@@ -20,28 +20,25 @@ package org.apache.metron.domain;
import org.apache.metron.enrichment.interfaces.EnrichmentAdapter;
import java.io.Serializable;
-import java.util.List;
public class Enrichment<T extends EnrichmentAdapter> implements Serializable {
- private String name;
- private List<String> fields;
+ private String type;
private T adapter;
- public String getName() {
- return name;
- }
+ public Enrichment() {}
- public void setName(String name) {
- this.name = name;
+ public Enrichment(String type, T adapter) {
+ this.type = type;
+ this.adapter = adapter;
}
- public List<String> getFields() {
- return fields;
+ public String getType() {
+ return type;
}
- public void setFields(List<String> fields) {
- this.fields = fields;
+ public void setType(String type) {
+ this.type = type;
}
public T getAdapter() {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
new file mode 100644
index 0000000..8e1a960
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/SourceConfig.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+public class SourceConfig {
+
+ final static ObjectMapper _mapper = new ObjectMapper();
+
+ private String index;
+ private Map<String, List<String>> enrichmentFieldMap;
+ private Map<String, List<String>> threatIntelFieldMap;
+ private int batchSize;
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void setIndex(String index) {
+ this.index = index;
+ }
+
+ public Map<String, List<String>> getEnrichmentFieldMap() {
+ return enrichmentFieldMap;
+ }
+
+ public void setEnrichmentFieldMap(Map<String, List<String>> enrichmentFieldMap) {
+ this.enrichmentFieldMap = enrichmentFieldMap;
+ }
+
+ public Map<String, List<String>> getThreatIntelFieldMap() {
+ return threatIntelFieldMap;
+ }
+
+ public void setThreatIntelFieldMap(Map<String, List<String>> threatIntelFieldMap) {
+ this.threatIntelFieldMap = threatIntelFieldMap;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public static synchronized SourceConfig load(InputStream is) throws IOException {
+ SourceConfig ret = _mapper.readValue(is, SourceConfig.class);
+ return ret;
+ }
+
+ public static synchronized SourceConfig load(byte[] data) throws IOException {
+ return load( new ByteArrayInputStream(data));
+ }
+
+ public static synchronized SourceConfig load(String s, Charset c) throws IOException {
+ return load( s.getBytes(c));
+ }
+ public static synchronized SourceConfig load(String s) throws IOException {
+ return load( s, Charset.defaultCharset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
new file mode 100644
index 0000000..4f7be3b
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentConstants.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.enrichment;
+
+public class EnrichmentConstants {
+
+
+
+ public static final String INDEX_NAME = "index.name";
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
deleted file mode 100644
index 967970f..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/enrichment/EnrichmentSplitterBolt.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.enrichment;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import com.google.common.base.Splitter;
-import org.apache.metron.bolt.SplitBolt;
-import org.apache.metron.domain.Enrichment;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Created by cstella on 2/10/16.
- */
-public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
- protected static final Logger LOG = LoggerFactory.getLogger(EnrichmentSplitterBolt.class);
- protected List<Enrichment> enrichments = new ArrayList<>();
- protected String messageFieldName = "message";
- /**
- * @param enrichments A class for sending tuples to enrichment bolt
- * @return Instance of this class
- */
- public EnrichmentSplitterBolt withEnrichments(List<Enrichment> enrichments) {
- this.enrichments = enrichments;
- return this;
- }
- public EnrichmentSplitterBolt withMessageFieldName(String messageFieldName) {
- this.messageFieldName = messageFieldName;
- return this;
- }
- @Override
- public void prepare(Map map, TopologyContext topologyContext) {
-
- }
- @Override
- public String getKey(Tuple tuple, JSONObject message) {
- String key = null;
- try {
- key = tuple.getStringByField("key");
- }
- catch(Throwable t) {
- //swallowing this just in case.
- }
- if(key != null) {
- return key;
- }
- else {
- return UUID.randomUUID().toString();
- }
- }
-
- @Override
- public List<JSONObject> generateMessages(Tuple tuple) {
- return Arrays.asList((JSONObject)tuple.getValueByField(messageFieldName));
- }
-
- @Override
- public Set<String> getStreamIds() {
- Set<String> streamIds = new HashSet<>();
- for(Enrichment enrichment: enrichments) {
- streamIds.add(enrichment.getName());
- }
- return streamIds;
- }
- @SuppressWarnings("unchecked")
- @Override
- public Map<String, JSONObject> splitMessage(JSONObject message) {
-
- Map<String, JSONObject> streamMessageMap = new HashMap<>();
- for (Enrichment enrichment : enrichments) {
- List<String> fields = enrichment.getFields();
- if (fields != null && fields.size() > 0) {
- JSONObject enrichmentObject = new JSONObject();
- for (String field : fields) {
- enrichmentObject.put(field, getField(message,field));
- }
- streamMessageMap.put(enrichment.getName(), enrichmentObject);
- }
- }
- /*if(message != null && enrichments.size() != 1) {
- throw new RuntimeException("JSON: " + message.toJSONString() + " => " + streamMessageMap);
- }*/
- return streamMessageMap;
- }
-
- public Object getField(JSONObject object, String path) {
- Map ret = object;
- for(String node: Splitter.on('/').split(path)) {
- Object o = ret.get(node);
- if(o instanceof Map) {
- ret = (Map) o;
- }
- else {
- return o;
- }
- }
- return ret;
- }
-
- @Override
- public void declareOther(OutputFieldsDeclarer declarer) {
-
- }
-
- @Override
- public void emitOther(Tuple tuple, List<JSONObject> messages) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
index 7aa02c5..6caa016 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HBaseBolt.java
@@ -20,7 +20,6 @@ package org.apache.metron.hbase;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import com.google.common.base.Function;
@@ -40,7 +39,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
-import org.apache.metron.helpers.topology.ErrorGenerator;
+import org.apache.metron.helpers.topology.ErrorUtils;
/**
* A Storm bolt for putting data into HBase.
@@ -136,7 +135,7 @@ public class HBaseBolt implements IRichBolt {
this.connector.put(p);
} catch (IOException ex) {
- JSONObject error = ErrorGenerator.generateErrorMessage(
+ JSONObject error = ErrorUtils.generateErrorMessage(
"Alerts problem: " + input.toString(), ex);
collector.emit("error", new Values(error));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
index 9055837..e454f04 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/hbase/HTableProvider.java
@@ -23,9 +23,6 @@ import org.apache.hadoop.hbase.client.HTableInterface;
import java.io.IOException;
-/**
- * Created by cstella on 2/11/16.
- */
public class HTableProvider implements TableProvider {
@Override
public HTableInterface getTable(Configuration config, String tableName) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java
deleted file mode 100644
index 8ec940a..0000000
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorGenerator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.helpers.topology;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.json.simple.JSONObject;
-
-public class ErrorGenerator {
-
- @SuppressWarnings("unchecked")
- public static JSONObject generateErrorMessage(String message, Exception e)
- {
- JSONObject error_message = new JSONObject();
-
- /*
- * Save full stack trace in object.
- */
- String stackTrace = ExceptionUtils.getStackTrace(e);
-
- String exception = e.toString();
-
- error_message.put("time", System.currentTimeMillis());
- try {
- error_message.put("hostname", InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException ex) {
- // TODO Auto-generated catch block
- ex.printStackTrace();
- }
-
- error_message.put("message", message);
- error_message.put("exception", exception);
- error_message.put("stack", stackTrace);
-
- return error_message;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
new file mode 100644
index 0000000..b02cbaf
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/helpers/topology/ErrorUtils.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.helpers.topology;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Values;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.metron.Constants;
+import org.json.simple.JSONObject;
+
+public class ErrorUtils {
+
+ @SuppressWarnings("unchecked")
+ public static JSONObject generateErrorMessage(String message, Throwable t)
+ {
+ JSONObject error_message = new JSONObject();
+
+ /*
+ * Save full stack trace in object.
+ */
+ String stackTrace = ExceptionUtils.getStackTrace(t);
+
+ String exception = t.toString();
+
+ error_message.put("time", System.currentTimeMillis());
+ try {
+ error_message.put("hostname", InetAddress.getLocalHost().getHostName());
+ } catch (UnknownHostException ex) {
+ // TODO Auto-generated catch block
+ ex.printStackTrace();
+ }
+
+ error_message.put("message", message);
+ error_message.put(Constants.SOURCE_TYPE, "error");
+ error_message.put("exception", exception);
+ error_message.put("stack", stackTrace);
+
+ return error_message;
+ }
+
+ public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
+ JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t);
+ collector.emit(errorStream, new Values(error));
+ collector.reportError(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
new file mode 100644
index 0000000..2c430d3
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.spout.pcap;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+import storm.kafka.Callback;
+import storm.kafka.EmitContext;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public class HDFSWriterCallback implements Callback {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
+ public static final byte[] PCAP_GLOBAL_HEADER = new byte[] {
+ (byte) 0xd4, (byte) 0xc3, (byte) 0xb2, (byte) 0xa1, 0x02, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00
+ ,0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00
+ };
+
+ private static final List<Object> RET_TUPLE = ImmutableList.of((Object)Byte.valueOf((byte) 0x00), Byte.valueOf((byte)0x00));
+ private FileSystem fs;
+ private SequenceFile.Writer writer;
+ private HDFSWriterConfig config;
+ private long batchStartTime;
+ private long numWritten;
+ private EmitContext context;
+
+ public HDFSWriterCallback() {
+ //this.config = config;
+ }
+
+ public HDFSWriterCallback withConfig(HDFSWriterConfig config) {
+ LOG.info("Configured: " + config);
+ this.config = config;
+ return this;
+ }
+
+ @Override
+ public List<Object> apply(List<Object> tuple, EmitContext context) {
+
+ LongWritable ts = (LongWritable) tuple.get(0);
+ BytesWritable rawPacket = (BytesWritable)tuple.get(1);
+ try {
+ turnoverIfNecessary(ts.get());
+ writer.append(ts, headerize(rawPacket.getBytes()));
+ writer.hflush();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ //drop? not sure..
+ }
+ return RET_TUPLE;
+ }
+
+ private static BytesWritable headerize(byte[] packet) {
+ byte[] ret = new byte[packet.length + PCAP_GLOBAL_HEADER.length];
+ int offset = 0;
+ System.arraycopy(PCAP_GLOBAL_HEADER, 0, ret, offset, PCAP_GLOBAL_HEADER.length);
+ offset += PCAP_GLOBAL_HEADER.length;
+ System.arraycopy(packet, 0, ret, offset, packet.length);
+ return new BytesWritable(ret);
+ }
+
+
+ private synchronized void turnoverIfNecessary(long ts) throws IOException {
+ long duration = ts - batchStartTime;
+ if(batchStartTime == 0L || duration > config.getMaxTimeMS() || numWritten > config.getNumPackets()) {
+ //turnover
+ Path path = getPath(ts);
+ if(writer != null) {
+ writer.close();
+ }
+ writer = SequenceFile.createWriter(new Configuration()
+ , SequenceFile.Writer.file(path)
+ , SequenceFile.Writer.keyClass(LongWritable.class)
+ , SequenceFile.Writer.valueClass(BytesWritable.class)
+ );
+ //reset state
+ LOG.info("Turning over and writing to " + path);
+ batchStartTime = ts;
+ numWritten = 0;
+ }
+ }
+
+ private Path getPath(long ts) {
+ String fileName = Joiner.on("_").join("pcap"
+ , "" + ts
+ , context.get(EmitContext.Type.UUID)
+ );
+ return new Path(config.getOutputPath(), fileName);
+ }
+
+ @Override
+ public void initialize(EmitContext context) {
+ this.context = context;
+ try {
+ fs = FileSystem.get(new Configuration());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to create filesystem", e);
+ }
+ }
+
+ /**
+ * Closes this resource, relinquishing any underlying resources.
+ * This method is invoked automatically on objects managed by the
+ * {@code try}-with-resources statement.
+ * <p/>
+ * <p>While this interface method is declared to throw {@code
+ * Exception}, implementers are <em>strongly</em> encouraged to
+ * declare concrete implementations of the {@code close} method to
+ * throw more specific exceptions, or to throw no exception at all
+ * if the close operation cannot fail.
+ * <p/>
+ * <p><em>Implementers of this interface are also strongly advised
+ * to not have the {@code close} method throw {@link
+ * InterruptedException}.</em>
+ * <p/>
+ * This exception interacts with a thread's interrupted status,
+ * and runtime misbehavior is likely to occur if an {@code
+ * InterruptedException} is {@linkplain Throwable#addSuppressed
+ * suppressed}.
+ * <p/>
+ * More generally, if it would cause problems for an
+ * exception to be suppressed, the {@code AutoCloseable.close}
+ * method should not throw it.
+ * <p/>
+ * <p>Note that unlike the {@link Closeable#close close}
+ * method of {@link Closeable}, this {@code close} method
+ * is <em>not</em> required to be idempotent. In other words,
+ * calling this {@code close} method more than once may have some
+ * visible side effect, unlike {@code Closeable.close} which is
+ * required to have no effect if called more than once.
+ * <p/>
+ * However, implementers of this interface are strongly encouraged
+ * to make their {@code close} methods idempotent.
+ *
+ * @throws Exception if this resource cannot be closed
+ */
+ @Override
+ public void close() throws Exception {
+ if(writer != null) {
+ writer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
new file mode 100644
index 0000000..ccfc884
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.spout.pcap;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HDFSWriterConfig implements Serializable {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ private long numPackets;
+ private long maxTimeMS;
+ private String outputPath;
+ private String zookeeperQuorum;
+
+ public HDFSWriterConfig withOutputPath(String path) {
+ outputPath = path;
+ return this;
+ }
+
+ public HDFSWriterConfig withNumPackets(long n) {
+ numPackets = n;
+ return this;
+ }
+
+ public HDFSWriterConfig withMaxTimeMS(long t) {
+ maxTimeMS = t;
+ return this;
+ }
+
+ public HDFSWriterConfig withZookeeperQuorum(String zookeeperQuorum) {
+ this.zookeeperQuorum = zookeeperQuorum;
+ return this;
+ }
+
+ public List<String> getZookeeperServers() {
+ List<String> out = new ArrayList<>();
+ if(zookeeperQuorum != null) {
+ for (String hostPort : Splitter.on(',').split(zookeeperQuorum)) {
+ Iterable<String> tokens = Splitter.on(':').split(hostPort);
+ String host = Iterables.getFirst(tokens, null);
+ if(host != null) {
+ out.add(host);
+ }
+ }
+ }
+ return out;
+ }
+
+ public Integer getZookeeperPort() {
+ if(zookeeperQuorum != null) {
+ String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null);
+ String portStr = Iterables.getLast(Splitter.on(':').split(hostPort));
+ return Integer.parseInt(portStr);
+ }
+ return null;
+ }
+
+ public String getOutputPath() {
+ return outputPath;
+ }
+
+ public long getNumPackets() {
+ return numPackets;
+ }
+
+ public long getMaxTimeMS() {
+ return maxTimeMS;
+ }
+
+ @Override
+ public String toString() {
+ return "HDFSWriterConfig{" +
+ "numPackets=" + numPackets +
+ ", maxTimeMS=" + maxTimeMS +
+ ", outputPath='" + outputPath + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
new file mode 100644
index 0000000..581d74f
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/topology/TopologyUtils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.topology;
+
+import org.apache.metron.Constants;
+import org.json.simple.JSONObject;
+
+public class TopologyUtils {
+
+ public static String getSourceType(JSONObject message) {
+ return (String) message.get(Constants.SOURCE_TYPE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java
new file mode 100644
index 0000000..7f5afe9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/ConfigUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import java.lang.reflect.InvocationTargetException;
+
+public class ConfigUtils<T> {
+
+ public static <T> T createInstance(String className, T defaultClass) {
+ T instance;
+ if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+ return defaultClass;
+ }
+ else {
+ try {
+ Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+ instance = clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("Unable to instantiate connector.", e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException("Unable to instantiate connector: illegal access", e);
+ } catch (InvocationTargetException e) {
+ throw new IllegalStateException("Unable to instantiate connector", e);
+ } catch (NoSuchMethodException e) {
+ throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+ }
+ }
+ return instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
new file mode 100644
index 0000000..b257b24
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/HBaseWriter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.utils.ConfigUtils;
+import org.apache.metron.writer.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializable {
+
+ private String tableName;
+ private String connectorImpl;
+ private TableProvider provider;
+ private HTableInterface table;
+
+ public HBaseWriter(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public HBaseWriter withProviderImpl(String connectorImpl) {
+ this.connectorImpl = connectorImpl;
+ return this;
+ }
+
+ @Override
+ public void init() {
+ final Configuration config = HBaseConfiguration.create();
+ try {
+ provider = ConfigUtils.createInstance(connectorImpl, new HTableProvider());
+ table = provider.getTable(config, tableName);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void write(String sourceType, SourceConfig configuration, Tuple tuple, JSONObject message) throws Exception {
+ Put put = new Put(getKey(tuple, message));
+ Map<String, byte[]> values = getValues(tuple, message);
+ for(String column: values.keySet()) {
+ String[] columnParts = column.split(":");
+ long timestamp = getTimestamp(tuple, message);
+ if (timestamp > -1) {
+ put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), timestamp, values.get(column));
+ } else {
+ put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), values.get(column));
+ }
+ }
+ table.put(put);
+ }
+
+ @Override
+ public void close() throws Exception {
+ table.close();
+ }
+
+ public abstract byte[] getKey(Tuple tuple, JSONObject message);
+ public abstract long getTimestamp(Tuple tuple, JSONObject message);
+ public abstract Map<String, byte[]> getValues(Tuple tuple, JSONObject message);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
new file mode 100644
index 0000000..b5ab587
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/PcapWriter.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PcapWriter extends HBaseWriter {
+
+ private String column;
+
+ public PcapWriter(String tableName, String column) {
+ super(tableName);
+ this.column = column;
+ }
+
+ @Override
+ public byte[] getKey(Tuple tuple, JSONObject message) {
+ String key = (String) message.get("pcap_id");
+ return key.getBytes();
+ }
+
+ @Override
+ public long getTimestamp(Tuple tuple, JSONObject message) {
+ return (long) message.get("ts_micro");
+ }
+
+ @Override
+ public Map<String, byte[]> getValues(Tuple tuple, JSONObject message) {
+ Map<String, byte[]> values = new HashMap<>();
+ values.put(column, tuple.getBinary(0));
+ return values;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
new file mode 100644
index 0000000..90c0261
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+
+import java.util.List;
+
+public interface BulkMessageWriter<T> extends AutoCloseable {
+
+ void init();
+ void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<T> messages) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
new file mode 100644
index 0000000..12de836
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/MessageWriter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.interfaces;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+
+public interface MessageWriter<T> extends AutoCloseable {
+
+ void init();
+ void write(String sourceType, SourceConfig configuration, Tuple tuple, T message) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java b/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java
new file mode 100644
index 0000000..ff05c29
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+ List<Object> apply(List<Object> tuple, EmitContext context);
+ void initialize(EmitContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java
new file mode 100644
index 0000000..485da5a
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackCollector.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CallbackCollector extends SpoutOutputCollector implements Serializable {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ Callback _callback;
+ SpoutOutputCollector _delegate;
+ EmitContext _context;
+ public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) {
+ super(collector);
+ this._callback = callback;
+ this._delegate = collector;
+ this._context = context;
+ }
+
+
+ /**
+ * Emits a new tuple to the specified output stream with the given message ID.
+ * When Storm detects that this tuple has been fully processed, or has failed
+ * to be fully processed, the spout will receive an ack or fail callback respectively
+ * with the messageId as long as the messageId was not null. If the messageId was null,
+ * Storm will not track the tuple and no callback will be received. The emitted values must be
+ * immutable.
+ *
+ * @param streamId
+ * @param tuple
+ * @param messageId
+ * @return the list of task ids that this tuple was sent to
+ */
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.MESSAGE_ID, messageId)
+ .with(EmitContext.Type.STREAM_ID, streamId)
+ );
+ return _delegate.emit(streamId, t, messageId);
+ }
+
+ /**
+ * Emits a new tuple to the default output stream with the given message ID.
+ * When Storm detects that this tuple has been fully processed, or has failed
+ * to be fully processed, the spout will receive an ack or fail callback respectively
+ * with the messageId as long as the messageId was not null. If the messageId was null,
+ * Storm will not track the tuple and no callback will be received. The emitted values must be
+ * immutable.
+ *
+ * @param tuple
+ * @param messageId
+ * @return the list of task ids that this tuple was sent to
+ */
+ @Override
+ public List<Integer> emit(List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.MESSAGE_ID, messageId));
+ return super.emit(t, messageId);
+ }
+
+ /**
+ * Emits a tuple to the default output stream with a null message id. Storm will
+ * not track this message so ack and fail will never be called for this tuple. The
+ * emitted values must be immutable.
+ *
+ * @param tuple
+ */
+ @Override
+ public List<Integer> emit(List<Object> tuple) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext());
+ return super.emit(t);
+ }
+
+ /**
+ * Emits a tuple to the specified output stream with a null message id. Storm will
+ * not track this message so ack and fail will never be called for this tuple. The
+ * emitted values must be immutable.
+ *
+ * @param streamId
+ * @param tuple
+ */
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
+ return super.emit(streamId, t);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the specified output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ *
+ * @param taskId
+ * @param streamId
+ * @param tuple
+ * @param messageId
+ */
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+ .with(EmitContext.Type.MESSAGE_ID, messageId)
+ .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+ );
+ super.emitDirect(taskId, streamId, t, messageId);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the default output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ *
+ * @param taskId
+ * @param tuple
+ * @param messageId
+ */
+ @Override
+ public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.MESSAGE_ID, messageId)
+ .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+ );
+ super.emitDirect(taskId, t, messageId);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the specified output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ * <p/>
+ * <p> Because no message id is specified, Storm will not track this message
+ * so ack and fail will never be called for this tuple.</p>
+ *
+ * @param taskId
+ * @param streamId
+ * @param tuple
+ */
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+ .with(EmitContext.Type.TASK_ID, new Integer(taskId))
+ );
+ super.emitDirect(taskId, streamId, t);
+ }
+
+ /**
+ * Emits a tuple to the specified task on the default output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
+ * <p/>
+ * <p> Because no message id is specified, Storm will not track this message
+ * so ack and fail will never be called for this tuple.</p>
+ *
+ * @param taskId
+ * @param tuple
+ */
+ @Override
+ public void emitDirect(int taskId, List<Object> tuple) {
+
+ List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, new Integer(taskId)));
+ super.emitDirect(taskId, t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/9f96399d/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java
new file mode 100644
index 0000000..431bdf9
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/storm/kafka/CallbackKafkaSpout.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import storm.kafka.*;
+
+import java.util.*;
+
+public class CallbackKafkaSpout extends KafkaSpout {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ Class<? extends Callback> callbackClazz;
+ Callback _callback;
+ EmitContext _context;
+ public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) {
+ this(spoutConfig, toCallbackClass(callbackClass));
+ }
+
+ public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) {
+ super(spoutConf);
+ callbackClazz = callback;
+ }
+
+ public void initialize() {
+ _callback = createCallback(callbackClazz);
+ _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig)
+ .with(EmitContext.Type.UUID, _uuid);
+ _callback.initialize(_context);
+ }
+
+
+ private static Class<? extends Callback> toCallbackClass(String callbackClass) {
+ try{
+ return (Class<? extends Callback>) Callback.class.forName(callbackClass);
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException(callbackClass + " not found", e);
+ }
+ }
+
+ protected Callback createCallback(Class<? extends Callback> callbackClass) {
+ try {
+ return callbackClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new RuntimeException("Unable to instantiate callback", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Illegal access", e);
+ }
+ }
+
+ @Override
+ public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ if(_callback == null) {
+ initialize();
+ }
+ super.open( conf, context
+ , new CallbackCollector(_callback, collector
+ ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
+ .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
+ )
+ );
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if(_callback != null) {
+ try {
+ _callback.close();
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to close callback", e);
+ }
+ }
+ }
+}