You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/07/20 13:14:25 UTC
[2/5] incubator-metron git commit: METRON-154: Decouple enrichment
and indexing closes apache/incubator-metron#192
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index ecd1ce8..e7ad755 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -26,8 +26,8 @@ import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFac
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.interfaces.BulkMessageWriter;
import org.apache.metron.common.interfaces.MessageWriter;
-import org.apache.metron.common.writer.BulkWriterComponent;
-import org.apache.metron.common.writer.WriterToBulkWriter;
+import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.metron.writer.WriterToBulkWriter;
import org.json.simple.JSONObject;
import java.io.Serializable;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index d9004d1..000b7ff 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -29,12 +29,12 @@ import org.apache.metron.common.interfaces.MessageWriter;
import org.apache.metron.common.spout.kafka.SpoutConfig;
import org.apache.metron.common.spout.kafka.SpoutConfigOptions;
import org.apache.metron.common.utils.ReflectionUtils;
-import org.apache.metron.common.writer.AbstractWriter;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.metron.parsers.writer.KafkaWriter;
+import org.apache.metron.writer.AbstractWriter;
+import org.apache.metron.writer.kafka.KafkaWriter;
import org.json.simple.JSONObject;
import storm.kafka.KafkaSpout;
import storm.kafka.ZkHosts;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
deleted file mode 100644
index 6090491..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.interfaces.MessageWriter;
-import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.StringUtils;
-import org.apache.metron.common.writer.AbstractWriter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>, Serializable {
- public enum Configurations {
- BROKER("kafka.brokerUrl")
- ,KEY_SERIALIZER("kafka.keySerializer")
- ,VALUE_SERIALIZER("kafka.valueSerializer")
- ,REQUIRED_ACKS("kafka.requiredAcks")
- ,TOPIC("kafka.topic")
- ,PRODUCER_CONFIGS("kafka.producerConfigs");
- ;
- String key;
- Configurations(String key) {
- this.key = key;
- }
- public Object get(Optional<String> configPrefix, Map<String, Object> config) {
- return config.get(StringUtils.join(".", configPrefix, Optional.of(key)));
- }
- public <T> T getAndConvert(Optional<String> configPrefix, Map<String, Object> config, Class<T> clazz) {
- Object o = get(configPrefix, config);
- if(o != null) {
- return ConversionUtils.convert(o, clazz);
- }
- return null;
- }
- }
- private String brokerUrl;
- private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
- private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
- private int requiredAcks = 1;
- private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
- private KafkaProducer kafkaProducer;
- private String configPrefix = null;
- private Map<String, Object> producerConfigs = new HashMap<>();
-
- public KafkaWriter() {}
-
- public KafkaWriter(String brokerUrl) {
- this.brokerUrl = brokerUrl;
- }
-
- public KafkaWriter withKeySerializer(String keySerializer) {
- this.keySerializer = keySerializer;
- return this;
- }
-
- public KafkaWriter withValueSerializer(String valueSerializer) {
- this.valueSerializer = valueSerializer;
- return this;
- }
-
- public KafkaWriter withRequiredAcks(Integer requiredAcks) {
- this.requiredAcks = requiredAcks;
- return this;
- }
-
- public KafkaWriter withTopic(String topic) {
- this.kafkaTopic= topic;
- return this;
- }
- public KafkaWriter withConfigPrefix(String prefix) {
- this.configPrefix = prefix;
- return this;
- }
-
- public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) {
- this.producerConfigs = extraConfigs;
- return this;
- }
-
- public Optional<String> getConfigPrefix() {
- return Optional.ofNullable(configPrefix);
- }
-
- @Override
- public void configure(String sensorName, WriterConfiguration configuration) {
- Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
- String brokerUrl = Configurations.BROKER.getAndConvert(getConfigPrefix(), configMap, String.class);
- if(brokerUrl != null) {
- this.brokerUrl = brokerUrl;
- }
- String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
- if(keySerializer != null) {
- withKeySerializer(keySerializer);
- }
- String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
- if(valueSerializer != null) {
- withValueSerializer(keySerializer);
- }
- Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(getConfigPrefix(), configMap, Integer.class);
- if(requiredAcks!= null) {
- withRequiredAcks(requiredAcks);
- }
- String topic = Configurations.TOPIC.getAndConvert(getConfigPrefix(), configMap, String.class);
- if(topic != null) {
- withTopic(topic);
- }
- Map<String, Object> producerConfigs = (Map)Configurations.PRODUCER_CONFIGS.get(getConfigPrefix(), configMap);
- if(producerConfigs != null) {
- withProducerConfigs(producerConfigs);
- }
- }
-
- public Map<String, Object> createProducerConfigs() {
- Map<String, Object> producerConfig = new HashMap<>();
- producerConfig.put("bootstrap.servers", brokerUrl);
- producerConfig.put("key.serializer", keySerializer);
- producerConfig.put("value.serializer", valueSerializer);
- producerConfig.put("request.required.acks", requiredAcks);
- producerConfig.putAll(producerConfigs == null?new HashMap<>():producerConfigs);
- return producerConfig;
- }
-
- @Override
- public void init() {
-
- this.kafkaProducer = new KafkaProducer<>(createProducerConfigs());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
- kafkaProducer.send(new ProducerRecord<String, String>(kafkaTopic, message.toJSONString()));
- }
-
- @Override
- public void close() throws Exception {
- kafkaProducer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index 116e262..b3d09d0 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -18,25 +18,21 @@
package org.apache.metron.parsers.integration;
import junit.framework.Assert;
-import org.apache.commons.io.FilenameUtils;
import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.Processor;
import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.components.KafkaWithZKComponent;
import org.apache.metron.integration.utils.TestUtils;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.parsers.integration.validation.SampleDataValidation;
import org.apache.metron.test.TestDataType;
import org.apache.metron.test.utils.SampleDataUtils;
import org.apache.metron.test.utils.UnitTestHelper;
-import org.junit.Before;
import org.junit.Test;
-import java.io.File;
import java.util.*;
public abstract class ParserIntegrationTest extends BaseIntegrationTest {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
deleted file mode 100644
index 57dc3e2..0000000
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/writer/KafkaWriterTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.writer;
-
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.configuration.ParserConfigurations;
-import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KafkaWriterTest {
-
-
- public static final String SENSOR_TYPE = "test";
- public WriterConfiguration createConfiguration(final Map<String, Object> parserConfig) {
- ParserConfigurations configurations = new ParserConfigurations();
- configurations.updateSensorParserConfig( SENSOR_TYPE
- , new SensorParserConfig() {{
- setParserConfig(parserConfig);
- }}
- );
- return new ParserWriterConfiguration(configurations);
- }
-
- @Test
- public void testHappyPathGlobalConfig() throws Exception {
- KafkaWriter writer = new KafkaWriter();
- WriterConfiguration configuration = createConfiguration(
- new HashMap<String, Object>() {{
- put("kafka.brokerUrl" , "localhost:6667");
- put("kafka.topic" , SENSOR_TYPE);
- put("kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
- }}
- );
-
- writer.configure(SENSOR_TYPE, configuration);
- Map<String, Object> producerConfigs = writer.createProducerConfigs();
- Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
- Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
- Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
- Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
- Assert.assertEquals(producerConfigs.get("key1"), 1);
- Assert.assertEquals(producerConfigs.get("key2"), "value2");
- }
-
- @Test
- public void testHappyPathGlobalConfigWithPrefix() throws Exception {
- KafkaWriter writer = new KafkaWriter();
- writer.withConfigPrefix("prefix");
- WriterConfiguration configuration = createConfiguration(
- new HashMap<String, Object>() {{
- put("prefix.kafka.brokerUrl" , "localhost:6667");
- put("prefix.kafka.topic" , SENSOR_TYPE);
- put("prefix.kafka.producerConfigs" , ImmutableMap.of("key1", 1, "key2", "value2"));
- }}
- );
-
- writer.configure(SENSOR_TYPE, configuration);
- Map<String, Object> producerConfigs = writer.createProducerConfigs();
- Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
- Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
- Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
- Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
- Assert.assertEquals(producerConfigs.get("key1"), 1);
- Assert.assertEquals(producerConfigs.get("key2"), "value2");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index cea635f..a21d45b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -20,17 +20,16 @@ package org.apache.metron.writers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.enrichment.converter.EnrichmentConverter;
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.integration.mock.MockTableProvider;
import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.integration.mock.MockTableProvider;
import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter;
+import org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index f6436b9..2f359ef 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -20,31 +20,25 @@ package org.apache.metron.writers.integration;
import com.google.common.collect.ImmutableList;
import org.adrianwalker.multilinestring.Multiline;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.TestConstants;
-import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.enrichment.converter.EnrichmentConverter;
import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
+import org.apache.metron.enrichment.integration.mock.MockTableProvider;
import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.integration.*;
-import org.apache.metron.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.mock.MockTableProvider;
-import org.apache.metron.integration.utils.TestUtils;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.TestDataType;
import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.test.utils.SampleDataUtils;
import org.apache.metron.test.utils.UnitTestHelper;
import org.junit.Assert;
import org.junit.Test;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;
@@ -53,7 +47,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
/**
{
"parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
- ,"writerClassName" : "org.apache.metron.writer.hbase.SimpleHbaseEnrichmentWriter"
+ ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter"
,"sensorTopic":"dummy"
,"parserConfig":
{
@@ -61,7 +55,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
,"shew.cf" : "cf"
,"shew.keyColumns" : "col2"
,"shew.enrichmentType" : "et"
- ,"shew.hbaseProvider" : "org.apache.metron.integration.mock.MockTableProvider"
+ ,"shew.hbaseProvider" : "org.apache.metron.enrichment.integration.mock.MockTableProvider"
,"columns" : {
"col1" : 0
,"col2" : 1
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index a6e83f1..afdcf04 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -18,37 +18,22 @@
package org.apache.metron.writers.integration;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.adrianwalker.multilinestring.Multiline;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.FieldValidator;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.field.validation.FieldValidation;
import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.enrichment.converter.EnrichmentConverter;
-import org.apache.metron.enrichment.converter.EnrichmentKey;
-import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.*;
-import org.apache.metron.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.integration.mock.MockTableProvider;
-import org.apache.metron.integration.utils.TestUtils;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.TestDataType;
-import org.apache.metron.test.mock.MockHTable;
-import org.apache.metron.test.utils.SampleDataUtils;
import org.apache.metron.test.utils.UnitTestHelper;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml
index 54b50f4..3aceda0 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -32,16 +32,16 @@
<version>${global_hbase_guava_version}</version>
</dependency>
<dependency>
- <groupId>org.apache.metron</groupId>
- <artifactId>metron-enrichment</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${global_solr_version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-enrichment</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-test-framework</artifactId>
<version>${global_solr_version}</version>
@@ -98,6 +98,10 @@
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
+ <exclusion>
+ <artifactId>commons-httpclient</artifactId>
+ <groupId>commons-httpclient</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -122,8 +126,8 @@
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
@@ -133,6 +137,37 @@
<version>${global_mockito_version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-indexing</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-enrichment</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-indexing</artifactId>
+ <version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
@@ -201,6 +236,7 @@
<goal>shade</goal>
</goals>
<configuration>
+
<artifactSet>
<excludes>
<exclude>storm:storm-core:*</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties
index cdfe25a..8489d8e 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -14,37 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+##### Storm #####
+indexing.workers=1
+indexing.executors=0
##### Kafka #####
kafka.zk=node1:2181
kafka.broker=node1:6667
-spout.kafka.topic.asa=asa
-spout.kafka.topic.bro=bro
-spout.kafka.topic.fireeye=fireeye
-spout.kafka.topic.ise=ise
-spout.kafka.topic.lancope=lancope
-spout.kafka.topic.paloalto=paloalto
-spout.kafka.topic.pcap=pcap
-spout.kafka.topic.snort=snort
-spout.kafka.topic.yaf=yaf
+kafka.start=WHERE_I_LEFT_OFF
##### Indexing #####
+index.input.topic=indexing
+index.error.topic=indexing_error
writer.class.name=org.apache.metron.solr.writer.SolrWriter
-##### ElasticSearch #####
-
-es.ip=10.22.0.214
-es.port=9300
-es.clustername=elasticsearch
-
-##### MySQL #####
-
-mysql.ip=10.22.0.214
-mysql.port=3306
-mysql.username=root
-mysql.password=hadoop123
-
##### Metrics #####
#reporters
@@ -63,23 +47,6 @@ org.apache.metron.metrics.TelemetryParserBolt.emits=true
org.apache.metron.metrics.TelemetryParserBolt.fails=true
-#GenericEnrichmentBolt
-org.apache.metron.metrics.GenericEnrichmentBolt.acks=true
-org.apache.metron.metrics.GenericEnrichmentBolt.emits=true
-org.apache.metron.metrics.GenericEnrichmentBolt.fails=true
-
-
-#TelemetryIndexingBolt
-org.apache.metron.metrics.TelemetryIndexingBolt.acks=true
-org.apache.metron.metrics.TelemetryIndexingBolt.emits=true
-org.apache.metron.metrics.TelemetryIndexingBolt.fails=true
-
-##### Host Enrichment #####
-
-org.apache.metron.enrichment.host.known_hosts=[{"ip":"10.1.128.236", "local":"YES", "type":"webserver", "asset_value" : "important"},\
-{"ip":"10.1.128.237", "local":"UNKNOWN", "type":"unknown", "asset_value" : "important"},\
-{"ip":"10.60.10.254", "local":"YES", "type":"printer", "asset_value" : "important"}]
-
##### HDFS #####
bolt.hdfs.batch.size=5000
@@ -91,19 +58,3 @@ bolt.hdfs.finished.file.path=/paloalto/rotated
bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
index.hdfs.output=/tmp/metron/enriched
-##### HBase #####
-bolt.hbase.table.name=pcap
-bolt.hbase.table.fields=t:value
-bolt.hbase.table.key.tuple.field.name=key
-bolt.hbase.table.timestamp.tuple.field.name=timestamp
-bolt.hbase.enable.batching=false
-bolt.hbase.write.buffer.size.in.bytes=2000000
-bolt.hbase.durability=SKIP_WAL
-bolt.hbase.partitioner.region.info.refresh.interval.mins=60
-
-##### Threat Intel #####
-
-threat.intel.tracker.table=
-threat.intel.tracker.cf=
-threat.intel.ip.table=
-threat.intel.ip.cf=
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
index 14628d2..7a98fc7 100755
--- a/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
+++ b/metron-platform/metron-solr/src/main/scripts/start_solr_topology.sh
@@ -19,4 +19,4 @@
METRON_VERSION=${project.version}
METRON_HOME=/usr/metron/$METRON_VERSION
TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/solr.properties
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/remote.yaml --filter $METRON_HOME/config/solr.properties
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
deleted file mode 100644
index 31e8d49..0000000
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrEnrichmentIntegrationTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.solr.integration;
-
-import com.google.common.base.Function;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.interfaces.FieldNameConverter;
-import org.apache.metron.integration.EnrichmentIntegrationTest;
-import org.apache.metron.integration.ComponentRunner;
-import org.apache.metron.integration.InMemoryComponent;
-import org.apache.metron.integration.Processor;
-import org.apache.metron.integration.ReadinessState;
-import org.apache.metron.integration.components.KafkaWithZKComponent;
-import org.apache.metron.solr.integration.components.SolrComponent;
-import org.apache.metron.integration.utils.SampleUtil;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
-import org.apache.metron.common.utils.JSONUtils;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class SolrEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
-
- private String collection = "metron";
- private FieldNameConverter fieldNameConverter = fieldName -> fieldName;
- @Override
- public FieldNameConverter getFieldNameConverter() {
- return fieldNameConverter;
- }
-
- @Override
- public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
- SolrComponent solrComponent = new SolrComponent.Builder()
- .addCollection(collection, "../metron-solr/src/test/resources/solr/conf")
- .withPostStartCallback(new Function<SolrComponent, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable SolrComponent solrComponent) {
- topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
- try {
- String testZookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
- Configurations configurations = SampleUtil.getSampleConfigs();
- Map<String, Object> globalConfig = configurations.getGlobalConfig();
- globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
- ConfigurationsUtils.writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- })
- .build();
- return solrComponent;
- }
-
- @Override
- public Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
- return new Processor<List<Map<String, Object>>>() {
- List<Map<String, Object>> docs = null;
- public ReadinessState process(ComponentRunner runner) {
- SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class);
- if (solrComponent.hasCollection(collection)) {
- List<Map<String, Object>> docsFromDisk;
- try {
- docs = solrComponent.getAllIndexedDocs(collection);
- docsFromDisk = EnrichmentIntegrationTest.readDocsFromDisk(hdfsDir);
- System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve indexed documents.", e);
- }
- if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
- return ReadinessState.NOT_READY;
- } else {
- return ReadinessState.READY;
- }
- } else {
- return ReadinessState.NOT_READY;
- }
- }
-
- public List<Map<String, Object>> getResult() {
- return docs;
- }
- };
- }
-
- @Override
- public void setAdditionalProperties(Properties topologyProperties) {
- topologyProperties.setProperty("writer.class.name", "org.apache.metron.solr.writer.SolrWriter");
- }
-
- @Override
- public String cleanField(String field) {
- return field.replaceFirst("_[dfils]$", "");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
new file mode 100644
index 0000000..dd9c559
--- /dev/null
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.integration;
+
+import com.google.common.base.Function;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.common.interfaces.FieldNameConverter;
+import org.apache.metron.enrichment.integration.utils.SampleUtil;
+import org.apache.metron.indexing.integration.IndexingIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.solr.integration.components.SolrComponent;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.utils.JSONUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class SolrIndexingIntegrationTest extends IndexingIntegrationTest {
+
+ private String collection = "metron";
+ private FieldNameConverter fieldNameConverter = fieldName -> fieldName;
+ @Override
+ public FieldNameConverter getFieldNameConverter() {
+ return fieldNameConverter;
+ }
+
+ @Override
+ public InMemoryComponent getSearchComponent(final Properties topologyProperties) throws Exception {
+ SolrComponent solrComponent = new SolrComponent.Builder()
+ .addCollection(collection, "../metron-solr/src/test/resources/solr/conf")
+ .withPostStartCallback(new Function<SolrComponent, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable SolrComponent solrComponent) {
+ topologyProperties.setProperty("solr.zk", solrComponent.getZookeeperUrl());
+ try {
+ String testZookeeperUrl = topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY);
+ Configurations configurations = SampleUtil.getSampleConfigs();
+ Map<String, Object> globalConfig = configurations.getGlobalConfig();
+ globalConfig.put("solr.zookeeper", solrComponent.getZookeeperUrl());
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(JSONUtils.INSTANCE.toJSON(globalConfig), testZookeeperUrl);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ })
+ .build();
+ return solrComponent;
+ }
+
+ @Override
+ public Processor<List<Map<String, Object>>> getProcessor(final List<byte[]> inputMessages) {
+ return new Processor<List<Map<String, Object>>>() {
+ List<Map<String, Object>> docs = null;
+ public ReadinessState process(ComponentRunner runner) {
+ SolrComponent solrComponent = runner.getComponent("search", SolrComponent.class);
+ if (solrComponent.hasCollection(collection)) {
+ List<Map<String, Object>> docsFromDisk;
+ try {
+ docs = solrComponent.getAllIndexedDocs(collection);
+ docsFromDisk = readDocsFromDisk(hdfsDir);
+ System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+ }
+ if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+ return ReadinessState.NOT_READY;
+ } else {
+ return ReadinessState.READY;
+ }
+ } else {
+ return ReadinessState.NOT_READY;
+ }
+ }
+
+ public List<Map<String, Object>> getResult() {
+ return docs;
+ }
+ };
+ }
+
+ @Override
+ public void setAdditionalProperties(Properties topologyProperties) {
+ topologyProperties.setProperty("writer.class.name", "org.apache.metron.solr.writer.SolrWriter");
+ }
+
+ @Override
+ public String cleanField(String field) {
+ return field.replaceFirst("_[dfils]$", "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 0993e0d..91c34af 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -17,11 +17,9 @@
*/
package org.apache.metron.solr.writer;
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
-import org.apache.metron.integration.utils.SampleUtil;
+import org.apache.metron.enrichment.integration.utils.SampleUtil;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.hamcrest.Description;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
new file mode 100644
index 0000000..d088c4b
--- /dev/null
+++ b/metron-platform/metron-writer/pom.xml
@@ -0,0 +1,231 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-platform</artifactId>
+ <version>0.2.0BETA</version>
+ </parent>
+ <artifactId>metron-writer</artifactId>
+ <name>metron-writer</name>
+ <description>Components common to all enrichments</description>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <commons.config.version>1.10</commons.config.version>
+ <antlr.version>4.5</antlr.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${global_hbase_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${global_storm_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>${global_kafka_version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${global_storm_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${global_hadoop_version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ </dependencies>
+
+ <reporting>
+ <plugins>
+ <!-- Normally, dependency report takes time, skip it -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.7</version>
+
+ <configuration>
+ <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>emma-maven-plugin</artifactId>
+ <version>1.0-alpha-3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-pmd-plugin</artifactId>
+ <configuration>
+ <targetJdk>${global_java_version}</targetJdk>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>${global_java_version}</source>
+ <compilerArgument>-Xlint:unchecked</compilerArgument>
+ <target>${global_java_version}</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-maven-plugin</artifactId>
+ <version>${antlr.version}</version>
+ <configuration>
+ <outputDirectory>${basedir}/src/main/java</outputDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>antlr4</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${global_shade_version}</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <artifactSet>
+ <excludes>
+ <exclude>*slf4j*</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava</shadedPattern>
+ </relocation>
+ </relocations>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+ <resource>.yaml</resource>
+ </transformer>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java
new file mode 100644
index 0000000..0fa596a
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AbstractWriter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+
+public abstract class AbstractWriter {
+ public AbstractWriter() {}
+ public abstract void configure(String sensorName, WriterConfiguration configuration);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
new file mode 100644
index 0000000..ba647bd
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class BulkWriterComponent<MESSAGE_T> {
+ public static final Logger LOG = LoggerFactory
+ .getLogger(BulkWriterComponent.class);
+ private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
+ private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
+ private OutputCollector collector;
+ private boolean handleCommit = true;
+ private boolean handleError = true;
+ public BulkWriterComponent(OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean handleError) {
+ this(collector);
+ this.handleCommit = handleCommit;
+ this.handleError = handleError;
+ }
+
+ public void commit(Iterable<Tuple> tuples) {
+ tuples.forEach(t -> collector.ack(t));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Acking " + Iterables.size(tuples) + " tuples");
+ }
+ }
+
+ public void error(Throwable e, Iterable<Tuple> tuples) {
+ tuples.forEach(t -> collector.ack(t));
+ if(!Iterables.isEmpty(tuples)) {
+ LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
+ ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+ }
+ }
+
+ protected Collection<Tuple> createTupleCollection() {
+ return new ArrayList<>();
+ }
+
+
+ public void errorAll(Throwable e) {
+ for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) {
+ error(e, kv.getValue());
+ sensorTupleMap.remove(kv.getKey());
+ sensorMessageMap.remove(kv.getKey());
+ }
+ }
+
+ public void errorAll(String sensorType, Throwable e) {
+ error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()));
+ sensorTupleMap.remove(sensorType);
+ sensorMessageMap.remove(sensorType);
+ }
+ public void write( String sensorType
+ , Tuple tuple
+ , MESSAGE_T message
+ , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+ , WriterConfiguration configurations
+ ) throws Exception
+ {
+ int batchSize = configurations.getBatchSize(sensorType);
+ Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
+ if (tupleList == null) {
+ tupleList = createTupleCollection();
+ }
+ tupleList.add(tuple);
+ List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
+ if (messageList == null) {
+ messageList = new ArrayList<>();
+ }
+ messageList.add(message);
+
+ if (tupleList.size() < batchSize) {
+ sensorTupleMap.put(sensorType, tupleList);
+ sensorMessageMap.put(sensorType, messageList);
+ } else {
+ try {
+ bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
+ if(handleCommit) {
+ commit(tupleList);
+ }
+
+ } catch (Throwable e) {
+ if(handleError) {
+ error(e, tupleList);
+ }
+ else {
+ throw e;
+ }
+ }
+ finally {
+ sensorTupleMap.remove(sensorType);
+ sensorMessageMap.remove(sensorType);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
new file mode 100644
index 0000000..b9aee7e
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.utils.ConversionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+public class NoopWriter extends AbstractWriter implements BulkMessageWriter<Object> {
+
+ public static class RandomLatency implements Function<Void, Void> {
+ private int min;
+ private int max;
+
+ public RandomLatency(int min, int max) {
+ this.min = min;
+ this.max = max;
+ }
+
+ public int getMin() {
+ return min;
+ }
+ public int getMax() {
+ return max;
+ }
+
+ @Override
+ public Void apply(Void aVoid) {
+ int sleepMs = ThreadLocalRandom.current().nextInt(min, max + 1);
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ }
+ return null;
+ }
+ }
+
+ public static class FixedLatency implements Function<Void, Void> {
+ private int latency;
+ public FixedLatency(int latency) {
+ this.latency = latency;
+ }
+ public int getLatency() {
+ return latency;
+ }
+
+ @Override
+ public Void apply(Void aVoid) {
+ if(latency > 0) {
+ try {
+ Thread.sleep(latency);
+ } catch (InterruptedException e) {
+ }
+ }
+ return null;
+ }
+ }
+ Function<Void, Void> sleepFunction = null;
+
+ public NoopWriter withLatency(String sleepConfig) {
+ sleepFunction = getSleepFunction(sleepConfig);
+ return this;
+ }
+
+
+ private Function<Void, Void> getSleepFunction(String sleepConfig) {
+ String usageMessage = "Unexpected: " + sleepConfig + " Expected value: integer for a fixed sleep duration in milliseconds (e.g. 10) " +
+ "or a range of latencies separated by a comma (e.g. \"10, 20\") to sleep a random amount in that range.";
+ try {
+ if (sleepConfig.contains(",")) {
+ // random latency within a range.
+ Iterable<String> it = Splitter.on(',').split(sleepConfig);
+ Integer min = ConversionUtils.convert(Iterables.getFirst(it, "").trim(), Integer.class);
+ Integer max= ConversionUtils.convert(Iterables.getLast(it, "").trim(), Integer.class);
+ if (min != null && max != null) {
+ return new RandomLatency(min, max);
+ }
+ } else {
+ //fixed latency
+ Integer latency = ConversionUtils.convert(sleepConfig.trim(), Integer.class);
+ if(latency != null) {
+ return new FixedLatency(latency);
+ }
+ }
+ }
+ catch(Throwable t) {
+ throw new IllegalArgumentException(usageMessage, t);
+ }
+ throw new IllegalArgumentException(usageMessage);
+ }
+
+ @Override
+ public void configure(String sensorName, WriterConfiguration configuration) {
+ Map<String, Object> config = configuration.getSensorConfig(sensorName);
+ if(config != null) {
+ Object noopLatency = config.get("noopLatency");
+ if(noopLatency != null) {
+ sleepFunction = getSleepFunction(noopLatency.toString());
+ }
+ }
+ }
+
+ @Override
+ public void init(Map stormConf, WriterConfiguration config) throws Exception {
+ }
+
+ @Override
+ public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<Object> messages) throws Exception {
+ if(sleepFunction != null) {
+ sleepFunction.apply(null);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
new file mode 100644
index 0000000..3df3e9d
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_T>, Serializable {
+ MessageWriter<MESSAGE_T> messageWriter;
+
+ public static transient Function<WriterConfiguration, WriterConfiguration> TRANSFORMATION = config -> new SingleBatchConfigurationFacade(config);
+
+ public WriterToBulkWriter(MessageWriter<MESSAGE_T> messageWriter) {
+ this.messageWriter = messageWriter;
+ }
+ @Override
+ public void init(Map stormConf, WriterConfiguration config) throws Exception {
+ messageWriter.init();
+ }
+
+ @Override
+ public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
+ if(messages.size() > 1) {
+ throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
+ }
+ messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
+ }
+
+ @Override
+ public void close() throws Exception {
+ messageWriter.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
new file mode 100644
index 0000000..91e8446
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.bolt.ConfiguredEnrichmentBolt;
+import org.apache.metron.common.configuration.writer.EnrichmentWriterConfiguration;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.writer.BulkWriterComponent;
+import org.apache.metron.writer.WriterToBulkWriter;
+import org.apache.metron.writer.message.MessageGetter;
+import org.apache.metron.writer.message.MessageGetters;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Function;
+
+public class BulkMessageWriterBolt extends ConfiguredEnrichmentBolt {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BulkMessageWriterBolt.class);
+ private BulkMessageWriter<JSONObject> bulkMessageWriter;
+ private BulkWriterComponent<JSONObject> writerComponent;
+ private String messageGetterStr = MessageGetters.NAMED.name();
+ private transient MessageGetter messageGetter = null;
+ private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation;
+ public BulkMessageWriterBolt(String zookeeperUrl) {
+ super(zookeeperUrl);
+ }
+
+ public BulkMessageWriterBolt withBulkMessageWriter(BulkMessageWriter<JSONObject > bulkMessageWriter) {
+ this.bulkMessageWriter = bulkMessageWriter;
+ return this;
+ }
+
+ public BulkMessageWriterBolt withMessageWriter(MessageWriter<JSONObject> messageWriter) {
+ this.bulkMessageWriter = new WriterToBulkWriter<>(messageWriter);
+ return this;
+ }
+
+ public BulkMessageWriterBolt withMessageGetter(String messageGetter) {
+ this.messageGetterStr = messageGetter;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.writerComponent = new BulkWriterComponent<>(collector);
+ super.prepare(stormConf, context, collector);
+ messageGetter = MessageGetters.valueOf(messageGetterStr);
+ if(bulkMessageWriter instanceof WriterToBulkWriter) {
+ configurationTransformation = WriterToBulkWriter.TRANSFORMATION;
+ }
+ else {
+ configurationTransformation = x -> x;
+ }
+ try {
+ bulkMessageWriter.init(stormConf
+ , configurationTransformation.apply(new EnrichmentWriterConfiguration(getConfigurations()))
+ );
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute(Tuple tuple) {
+ JSONObject message = messageGetter.getMessage(tuple);
+ String sensorType = MessageUtils.getSensorType(message);
+ try
+ {
+ writerComponent.write(sensorType
+ , tuple
+ , message
+ , bulkMessageWriter
+ , configurationTransformation.apply(new EnrichmentWriterConfiguration(getConfigurations()))
+ );
+ }
+ catch(Exception e) {
+ throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream(Constants.ERROR_STREAM, new Fields("message"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
new file mode 100644
index 0000000..01f1245
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
+ List<RotationAction> rotationActions = new ArrayList<>();
+ FileRotationPolicy rotationPolicy = new NoRotationPolicy();
+ SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
+ FileNameFormat fileNameFormat;
+ Map<String, SourceHandler> sourceHandlerMap = new HashMap<>();
+ transient Map stormConfig;
+ public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
+ this.fileNameFormat = fileNameFormat;
+ return this;
+ }
+
+ public HdfsWriter withSyncPolicy(SyncPolicy syncPolicy){
+ this.syncPolicy = syncPolicy;
+ return this;
+ }
+ public HdfsWriter withRotationPolicy(FileRotationPolicy rotationPolicy){
+ this.rotationPolicy = rotationPolicy;
+ return this;
+ }
+
+ public HdfsWriter addRotationAction(RotationAction action){
+ this.rotationActions.add(action);
+ return this;
+ }
+
+ @Override
+ public void init(Map stormConfig, WriterConfiguration configurations) {
+ this.stormConfig = stormConfig;
+ }
+
+
+ @Override
+ public void write(String sourceType
+ , WriterConfiguration configurations
+ , Iterable<Tuple> tuples
+ , List<JSONObject> messages
+ ) throws Exception
+ {
+ SourceHandler handler = getSourceHandler(sourceType);
+ handler.handle(messages);
+ }
+
+ @Override
+ public void close() {
+ for(SourceHandler handler : sourceHandlerMap.values()) {
+ handler.close();
+ }
+ }
+ private synchronized SourceHandler getSourceHandler(String sourceType) throws IOException {
+ SourceHandler ret = sourceHandlerMap.get(sourceType);
+ if(ret == null) {
+ ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new SourceFileNameFormat(sourceType, fileNameFormat), stormConfig);
+ sourceHandlerMap.put(sourceType, ret);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
new file mode 100644
index 0000000..1c345b4
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+
+import java.io.IOException;
+
+public class SourceAwareMoveAction implements RotationAction{
+ private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+ private String destination;
+
+ public SourceAwareMoveAction toDestination(String destDir){
+ destination = destDir;
+ return this;
+ }
+
+ private static String getSource(Path filePath) {
+ return filePath.getParent().getName();
+ }
+
+ @Override
+ public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+ Path destPath = new Path(new Path(destination, getSource(filePath)), filePath.getName());
+ LOG.info("Moving file " + filePath + " to " + destPath);
+ boolean success = fileSystem.rename(filePath, destPath);
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5ffcef8d/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
new file mode 100644
index 0000000..ae0242d
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.task.TopologyContext;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+
+import java.util.Map;
+
+public class SourceFileNameFormat implements FileNameFormat {
+ FileNameFormat delegate;
+ String sourceType;
+ public SourceFileNameFormat(String sourceType, FileNameFormat delegate) {
+ this.delegate = delegate;
+ this.sourceType = sourceType;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+ this.delegate.prepare(map, topologyContext);
+ }
+
+ @Override
+ public String getName(long l, long l1) {
+ return delegate.getName(l, l1);
+ }
+
+ @Override
+ public String getPath() {
+ return delegate.getPath() + "/" + sourceType;
+ }
+}