You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/06/16 17:54:33 UTC
incubator-metron git commit: METRON-223: Invalid and Erroneous
messages should go to kafka for further analysis in the Parser Topology. This
closes apache/incubator-metron#151
Repository: incubator-metron
Updated Branches:
refs/heads/master bdbf33a9d -> 916432c96
METRON-223: Invalid and Erroneous messages should go to kafka for further analysis in the Parser Topology. This closes apache/incubator-metron#151
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/916432c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/916432c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/916432c9
Branch: refs/heads/master
Commit: 916432c96626750c9a35caef346789778af3818b
Parents: bdbf33a
Author: cstella <ce...@gmail.com>
Authored: Thu Jun 16 13:54:11 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Jun 16 13:54:11 2016 -0400
----------------------------------------------------------------------
.../roles/metron-kafka/defaults/main.yml | 2 +
.../org/apache/metron/common/Constants.java | 2 +
.../common/configuration/FieldValidator.java | 3 +-
.../configuration/SensorParserConfig.java | 31 ++-
.../transformation/FieldTransformation.java | 3 +-
.../field/validation/FieldValidation.java | 3 +-
.../apache/metron/common/utils/ErrorUtils.java | 88 ++++++--
.../apache/metron/common/utils/StringUtils.java | 36 +++
.../common/writer/BulkWriterComponent.java | 19 +-
.../components/ConfigUploadComponent.java | 24 +-
.../components/KafkaWithZKComponent.java | 12 +-
.../org/apache/metron/parsers/GrokParser.java | 2 +-
.../apache/metron/parsers/bolt/ParserBolt.java | 58 ++---
.../apache/metron/parsers/bolt/WriterBolt.java | 93 ++++++++
.../metron/parsers/bolt/WriterHandler.java | 90 ++++++++
.../apache/metron/parsers/csv/CSVParser.java | 7 +-
.../parsers/topology/ParserTopologyBuilder.java | 89 ++++++--
.../parsers/topology/ParserTopologyCLI.java | 47 +++-
.../metron/parsers/writer/KafkaWriter.java | 31 ++-
.../metron/parsers/bolt/ParserBoltTest.java | 19 +-
.../metron/parsers/bolt/WriterBoltTest.java | 223 +++++++++++++++++++
.../metron/parsers/csv/CSVParserTest.java | 8 +
.../components/ParserTopologyComponent.java | 8 +-
.../integration/WriterBoltIntegrationTest.java | 205 +++++++++++++++++
24 files changed, 974 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-deployment/roles/metron-kafka/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron-kafka/defaults/main.yml b/metron-deployment/roles/metron-kafka/defaults/main.yml
index 4e83a67..d108d0f 100644
--- a/metron-deployment/roles/metron-kafka/defaults/main.yml
+++ b/metron-deployment/roles/metron-kafka/defaults/main.yml
@@ -20,5 +20,7 @@ topics_to_create:
- { topic: "pcap", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
- { topic: "bro", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
- { topic: "yaf", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+ - { topic: "parser_invalid", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
+ - { topic: "parser_error", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
- { topic: "snort", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
- { topic: "enrichments", num_partitions: 1, replication_factor: 1, retention_gb: 10 }
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 1b0695f..1175e8b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -27,6 +27,8 @@ public class Constants {
public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;
public static final String SENSOR_TYPE = "source.type";
public static final String ENRICHMENT_TOPIC = "enrichments";
+ public static final String DEFAULT_PARSER_ERROR_TOPIC = "parser_error";
+ public static final String DEFAULT_PARSER_INVALID_TOPIC = "parser_invalid";
public static final String ERROR_STREAM = "error";
public static final String INVALID_STREAM = "invalid";
public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
index 46b4b74..80d770a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldValidator.java
@@ -23,12 +23,13 @@ import org.apache.metron.common.field.validation.FieldValidation;
import org.apache.metron.common.field.validation.FieldValidations;
import org.json.simple.JSONObject;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class FieldValidator {
+public class FieldValidator implements Serializable {
public enum Config {
FIELD_VALIDATIONS("fieldValidations")
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 82b407f..2d9a9e0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -22,17 +22,36 @@ import com.google.common.collect.ImmutableList;
import org.apache.metron.common.utils.JSONUtils;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class SensorParserConfig {
+public class SensorParserConfig implements Serializable {
private String parserClassName;
private String filterClassName;
private String sensorTopic;
private String writerClassName;
+ private String errorWriterClassName;
+ private String invalidWriterClassName;
+
+ public String getErrorWriterClassName() {
+ return errorWriterClassName;
+ }
+
+ public void setErrorWriterClassName(String errorWriterClassName) {
+ this.errorWriterClassName = errorWriterClassName;
+ }
+
+ public String getInvalidWriterClassName() {
+ return invalidWriterClassName;
+ }
+
+ public void setInvalidWriterClassName(String invalidWriterClassName) {
+ this.invalidWriterClassName = invalidWriterClassName;
+ }
public String getWriterClassName() {
return writerClassName;
@@ -107,7 +126,8 @@ public class SensorParserConfig {
", filterClassName='" + filterClassName + '\'' +
", sensorTopic='" + sensorTopic + '\'' +
", writerClassName='" + writerClassName + '\'' +
- ", parserConfig=" + parserConfig +
+ ", errorWriterClassName='" + errorWriterClassName + '\'' +
+ ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
", parserConfig=" + parserConfig +
", fieldTransformations=" + fieldTransformations +
'}';
@@ -128,7 +148,9 @@ public class SensorParserConfig {
return false;
if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
return false;
- if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
+ if (getErrorWriterClassName() != null ? !getErrorWriterClassName().equals(that.getErrorWriterClassName()) : that.getErrorWriterClassName() != null)
+ return false;
+ if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null)
return false;
if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
return false;
@@ -142,7 +164,8 @@ public class SensorParserConfig {
result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
- result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
+ result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0);
+ result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0);
result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
return result;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
index b983815..e308dd3 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
@@ -18,10 +18,11 @@
package org.apache.metron.common.field.transformation;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
-public interface FieldTransformation {
+public interface FieldTransformation extends Serializable {
Map<String, Object> map( Map<String, Object> input
, List<String> outputField
, Map<String, Object> fieldMappingConfig
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
index 442f2d6..c0d4c40 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/validation/FieldValidation.java
@@ -18,9 +18,10 @@
package org.apache.metron.common.field.validation;
+import java.io.Serializable;
import java.util.Map;
-public interface FieldValidation {
+public interface FieldValidation extends Serializable {
boolean isValid( Map<String, Object> input
, Map<String, Object> validationConfig
, Map<String, Object> globalConfig
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
index 9fffe2a..92e0ab9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ErrorUtils.java
@@ -17,9 +17,19 @@
*/
package org.apache.metron.common.utils;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Values;
+import org.apache.commons.beanutils.Converter;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.metron.common.Constants;
import org.json.simple.JSONObject;
@@ -92,37 +102,73 @@ public class ErrorUtils {
}
}
- @SuppressWarnings("unchecked") // JSONObject extends HashMap w/o type parameters
- public static JSONObject generateErrorMessage(String message, Throwable t) {
+ @SuppressWarnings("unchecked")
+ public static JSONObject generateErrorMessage(String message, Throwable t)
+ {
+ return generateErrorMessage(message, t, Optional.empty(), Optional.empty());
+ }
+ public static JSONObject generateErrorMessage(String message
+ , Throwable t
+ , Optional<String> sensorType
+ , Optional<Object> rawMessage
+ )
+ {
JSONObject error_message = new JSONObject();
-
+
/*
* Save full stack trace in object.
*/
- String stackTrace = ExceptionUtils.getStackTrace(t);
-
- String exception = t.toString();
-
+ String stackTrace = ExceptionUtils.getStackTrace(t);
+
+ String exception = t.toString();
+
+
error_message.put("time", System.currentTimeMillis());
try {
error_message.put("hostname", InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException ex) {
- LOGGER.info("Unable to resolve hostname while generating error message", ex);
+
}
-
- error_message.put("message", message);
- error_message.put(Constants.SENSOR_TYPE, "error");
- error_message.put("exception", exception);
- error_message.put("stack", stackTrace);
-
- return error_message;
- }
+ if(rawMessage.isPresent()) {
+ if(rawMessage.get() instanceof byte[]) {
+ error_message.put("rawMessage", Bytes.toString((byte[])rawMessage.get()));
+ error_message.put("rawMessage_bytes", toByteArrayList((byte[])rawMessage.get()));
+ }
+ else {
+ error_message.put("rawMessage", rawMessage.get());
+ }
+ }
+ error_message.put("message", message);
+ error_message.put(Constants.SENSOR_TYPE, StringUtils.join("_", sensorType, Optional.of("error")));
+ error_message.put("exception", exception);
+ error_message.put("stack", stackTrace);
+
+ return error_message;
+ }
+
+ private static List<Byte> toByteArrayList(byte[] list) {
+ List<Byte> ret = new ArrayList<>();
+ for(byte b : list) {
+ ret.add(b);
+ }
+ return ret;
+ }
+
+ public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
+ handleError(collector, t, errorStream, Optional.empty(), Optional.empty());
+ }
+ public static void handleError(OutputCollector collector
+ , Throwable t
+ , String errorStream
+ , Optional<String> sensorType
+ , Optional<Object> rawMessage
+ )
+ {
+ JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t, sensorType, rawMessage);
+ collector.emit(errorStream, new Values(error));
+ collector.reportError(t);
+ }
- public static void handleError(OutputCollector collector, Throwable t, String errorStream) {
- JSONObject error = ErrorUtils.generateErrorMessage(t.getMessage(), t);
- collector.emit(errorStream, new Values(error));
- collector.reportError(t);
- }
public static String generateThreadDump() {
final StringBuilder dump = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java
new file mode 100644
index 0000000..9b0d77a
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/StringUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+public class StringUtils {
+ public static String join(String delim, Optional<String>... parts) {
+ return Joiner.on(delim).join(
+ Arrays.asList(parts).stream().filter(
+ part -> part.isPresent()
+ ).map( part -> part.get())
+ .toArray()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
index 7d66ce1..c0e4f37 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterComponent.java
@@ -60,8 +60,10 @@ public class BulkWriterComponent<MESSAGE_T> {
public void error(Throwable e, Iterable<Tuple> tuples) {
tuples.forEach(t -> collector.ack(t));
- LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
- ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+ if(!Iterables.isEmpty(tuples)) {
+ LOG.error("Failing " + Iterables.size(tuples) + " tuples", e);
+ ErrorUtils.handleError(collector, e, Constants.ERROR_STREAM);
+ }
}
protected Collection<Tuple> createTupleCollection() {
@@ -69,6 +71,19 @@ public class BulkWriterComponent<MESSAGE_T> {
}
+ public void errorAll(Throwable e) {
+ for(Map.Entry<String, Collection<Tuple>> kv : sensorTupleMap.entrySet()) {
+ error(e, kv.getValue());
+ sensorTupleMap.remove(kv.getKey());
+ sensorMessageMap.remove(kv.getKey());
+ }
+ }
+
+ public void errorAll(String sensorType, Throwable e) {
+ error(e, Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>()));
+ sensorTupleMap.remove(sensorType);
+ sensorMessageMap.remove(sensorType);
+ }
public void write( String sensorType
, Tuple tuple
, MESSAGE_T message
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
index 865b017..7e0f325 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/ConfigUploadComponent.java
@@ -20,11 +20,13 @@ package org.apache.metron.integration.components;
import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
public class ConfigUploadComponent implements InMemoryComponent {
@@ -33,6 +35,7 @@ public class ConfigUploadComponent implements InMemoryComponent {
private String globalConfigPath;
private String parserConfigsPath;
private String enrichmentConfigsPath;
+ private Optional<String> globalConfig = Optional.empty();
private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>();
public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) {
this.topologyProperties = topologyProperties;
@@ -58,21 +61,32 @@ public class ConfigUploadComponent implements InMemoryComponent {
return this;
}
+ public ConfigUploadComponent withGlobalConfig(String globalConfig) {
+ this.globalConfig = Optional.ofNullable(globalConfig);
+ return this;
+ }
@Override
public void start() throws UnableToStartException {
try {
- ConfigurationsUtils.uploadConfigsToZookeeper( globalConfigPath
- , parserConfigsPath
- , enrichmentConfigsPath
- , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
- );
+ if(globalConfigPath != null) {
+ ConfigurationsUtils.uploadConfigsToZookeeper(globalConfigPath
+ , parserConfigsPath
+ , enrichmentConfigsPath
+ , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+ );
+ }
for(Map.Entry<String, SensorParserConfig> kv : parserSensorConfigs.entrySet()) {
ConfigurationsUtils.writeSensorParserConfigToZookeeper( kv.getKey()
, kv.getValue()
, topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
);
}
+ if(globalConfig.isPresent()) {
+ ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.get().getBytes()
+ , topologyProperties.getProperty(KafkaWithZKComponent.ZOOKEEPER_PROPERTY)
+ );
+ }
} catch (Exception e) {
throw new UnableToStartException(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
index 99f916d..6d2261b 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaWithZKComponent.java
@@ -22,6 +22,7 @@ import com.google.common.base.Function;
import kafka.admin.AdminUtils;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
+import kafka.common.TopicExistsException;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
@@ -215,9 +216,14 @@ public class KafkaWithZKComponent implements InMemoryComponent {
}
public void createTopic(String name, int numPartitions, boolean waitUntilMetadataIsPropagated) throws InterruptedException {
- AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
- if(waitUntilMetadataIsPropagated) {
- waitUntilMetadataIsPropagated(name, numPartitions);
+ try {
+ AdminUtils.createTopic(zkClient, name, numPartitions, 1, new Properties());
+ if (waitUntilMetadataIsPropagated) {
+ waitUntilMetadataIsPropagated(name, numPartitions);
+ }
+ }
+ catch(TopicExistsException tee) {
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
index da72da8..925c329 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/GrokParser.java
@@ -169,7 +169,7 @@ public class GrokParser implements MessageParser<JSONObject>, Serializable {
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- throw new RuntimeException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
+ throw new IllegalStateException("Grok parser Error: " + e.getMessage() + " on " + originalMessage , e);
}
return messages;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 46a49fc..aebb8d7 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -43,6 +43,8 @@ import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.common.interfaces.MessageWriter;
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
@@ -50,38 +52,22 @@ import java.util.function.Function;
public class ParserBolt extends ConfiguredParserBolt implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class);
private OutputCollector collector;
private MessageParser<JSONObject> parser;
private MessageFilter<JSONObject> filter = new GenericMessageFilter();
- private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
- private BulkMessageWriter<JSONObject> messageWriter;
- private BulkWriterComponent<JSONObject> writerComponent;
- private boolean isBulk = false;
+ private WriterHandler writer;
public ParserBolt( String zookeeperUrl
, String sensorType
, MessageParser<JSONObject> parser
- , MessageWriter<JSONObject> writer
+ , WriterHandler writer
)
{
super(zookeeperUrl, sensorType);
- isBulk = false;
+ this.writer = writer;
this.parser = parser;
- messageWriter = new WriterToBulkWriter<>(writer);
}
- public ParserBolt( String zookeeperUrl
- , String sensorType
- , MessageParser<JSONObject> parser
- , BulkMessageWriter<JSONObject> writer
- )
- {
- super(zookeeperUrl, sensorType);
- isBulk = true;
- this.parser = parser;
- messageWriter = writer;
-
-
- }
public ParserBolt withMessageFilter(MessageFilter<JSONObject> filter) {
this.filter = filter;
@@ -103,23 +89,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
}
parser.init();
- if(isBulk) {
- writerTransformer = config -> new ParserWriterConfiguration(config);
- }
- else {
- writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
- }
- try {
- messageWriter.init(stormConf, writerTransformer.apply(getConfigurations()));
- } catch (Exception e) {
- throw new IllegalStateException("Unable to initialize message writer", e);
- }
- this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
- @Override
- protected Collection<Tuple> createTupleCollection() {
- return new HashSet<>();
- }
- };
+ writer.init(stormConf, collector, getConfigurations());
+
SensorParserConfig config = getSensorParserConfig();
if(config != null) {
config.init();
@@ -139,7 +110,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
try {
//we want to ack the tuple in the situation where we have are not doing a bulk write
//otherwise we want to defer to the writerComponent who will ack on bulk commit.
- boolean ackTuple = !isBulk;
+ boolean ackTuple = !writer.handleAck();
int numWritten = 0;
if(sensorParserConfig != null) {
List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations();
@@ -152,13 +123,13 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
}
}
+ numWritten++;
if(!isGloballyValid(message, fieldValidations)) {
message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");
collector.emit(Constants.INVALID_STREAM, new Values(message));
}
else {
- numWritten++;
- writerComponent.write(getSensorType(), tuple, message, messageWriter, writerTransformer.apply(getConfigurations()));
+ writer.write(getSensorType(), tuple, message, getConfigurations());
}
}
}
@@ -170,7 +141,12 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
collector.ack(tuple);
}
} catch (Throwable ex) {
- ErrorUtils.handleError(collector, ex, Constants.ERROR_STREAM);
+ ErrorUtils.handleError( collector
+ , ex
+ , Constants.ERROR_STREAM
+ , Optional.of(getSensorType())
+ , Optional.ofNullable(originalMessage)
+ );
collector.ack(tuple);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
new file mode 100644
index 0000000..380f760
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.utils.ErrorUtils;
+import org.json.simple.JSONObject;
+
+import java.util.Map;
+import java.util.Optional;
+
+public class WriterBolt extends BaseRichBolt {
+ private WriterHandler handler;
+ private ParserConfigurations configuration;
+ private String sensorType;
+ private transient OutputCollector collector;
+ public WriterBolt(WriterHandler handler, ParserConfigurations configuration, String sensorType) {
+ this.handler = handler;
+ this.configuration = configuration;
+ this.sensorType = sensorType;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ handler.init(stormConf, collector, configuration);
+ }
+
+ private JSONObject getMessage(Tuple tuple) {
+ Object ret = tuple.getValueByField("message");
+ if(ret != null) {
+ ret = tuple.getValue(0);
+ }
+ if(ret != null) {
+ return (JSONObject)((JSONObject)ret).clone();
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ JSONObject message = null;
+ try {
+ message = (JSONObject)((JSONObject) tuple.getValueByField("message")).clone();
+ handler.write(sensorType, tuple, message, configuration);
+ if(!handler.handleAck()) {
+ collector.ack(tuple);
+ }
+ } catch (Throwable e) {
+ ErrorUtils.handleError( collector
+ , e
+ , Constants.ERROR_STREAM
+ , Optional.of(sensorType)
+ , Optional.ofNullable(message)
+ );
+ collector.ack(tuple);
+ }
+ }
+
+ /**
+ * Declare the output schema for all the streams of this topology.
+ *
+ * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
+ */
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
new file mode 100644
index 0000000..ecd1ce8
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.apache.metron.common.writer.BulkWriterComponent;
+import org.apache.metron.common.writer.WriterToBulkWriter;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Function;
+
+public class WriterHandler implements Serializable {
+ private BulkMessageWriter<JSONObject> messageWriter;
+ private transient BulkWriterComponent<JSONObject> writerComponent;
+ private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
+ private boolean isBulk = false;
+ public WriterHandler(MessageWriter<JSONObject> writer) {
+ isBulk = false;
+ messageWriter = new WriterToBulkWriter<>(writer);
+
+ }
+ public WriterHandler(BulkMessageWriter<JSONObject> writer) {
+ isBulk = true;
+ messageWriter = writer;
+ }
+
+
+ public boolean handleAck() {
+ return isBulk;
+ }
+
+ public void init(Map stormConf, OutputCollector collector, ParserConfigurations configurations) {
+ if(isBulk) {
+ writerTransformer = config -> new ParserWriterConfiguration(config);
+ }
+ else {
+ writerTransformer = config -> new SingleBatchConfigurationFacade(new ParserWriterConfiguration(config));
+ }
+ try {
+ messageWriter.init(stormConf, writerTransformer.apply(configurations));
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to initialize message writer", e);
+ }
+ this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
+ @Override
+ protected Collection<Tuple> createTupleCollection() {
+ return new HashSet<>();
+ }
+ };
+ }
+
+ public void write( String sensorType
+ , Tuple tuple
+ , JSONObject message
+ , ParserConfigurations configurations
+ ) throws Exception {
+ writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations));
+ }
+
+ public void errorAll(String sensorType, Throwable e) {
+ writerComponent.errorAll(sensorType, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
index a0097ed..19a768e 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/csv/CSVParser.java
@@ -84,9 +84,10 @@ public class CSVParser extends BasicParser {
else {
return Collections.emptyList();
}
- } catch (Exception e) {
- LOG.error("Unable to parse " + new String(rawMessage), e);
- return Collections.emptyList();
+ } catch (Throwable e) {
+ String message = "Unable to parse " + new String(rawMessage) + ": " + e.getMessage();
+ LOG.error(message, e);
+ throw new IllegalStateException(message, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 4b357be..d83d260 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -19,6 +19,7 @@ package org.apache.metron.parsers.topology;
import backtype.storm.topology.TopologyBuilder;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
@@ -29,12 +30,15 @@ import org.apache.metron.common.spout.kafka.SpoutConfig;
import org.apache.metron.common.utils.ReflectionUtils;
import org.apache.metron.common.writer.AbstractWriter;
import org.apache.metron.parsers.bolt.ParserBolt;
+import org.apache.metron.parsers.bolt.WriterBolt;
+import org.apache.metron.parsers.bolt.WriterHandler;
import org.apache.metron.parsers.interfaces.MessageParser;
import org.apache.metron.parsers.writer.KafkaWriter;
import org.json.simple.JSONObject;
import storm.kafka.KafkaSpout;
import storm.kafka.ZkHosts;
+import java.util.EnumMap;
import java.util.Map;
public class ParserTopologyBuilder {
@@ -46,7 +50,11 @@ public class ParserTopologyBuilder {
int spoutParallelism,
int spoutNumTasks,
int parserParallelism,
- int parserNumTasks
+ int parserNumTasks,
+ int invalidWriterParallelism,
+ int invalidWriterNumTasks,
+ int errorWriterParallelism,
+ int errorWriterNumTasks
) throws Exception {
CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
client.start();
@@ -67,31 +75,68 @@ public class ParserTopologyBuilder {
.setNumTasks(spoutNumTasks);
MessageParser<JSONObject> parser = ReflectionUtils.createInstance(sensorParserConfig.getParserClassName());
parser.configure(sensorParserConfig.getParserConfig());
- ParserBolt parserBolt = null;
- {
- if(sensorParserConfig.getWriterClassName() == null) {
- KafkaWriter writer = new KafkaWriter(brokerUrl);
- writer.configure(sensorType, new ParserWriterConfiguration(configurations));
- parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, writer);
- }
- else {
- AbstractWriter writer = ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName());
- writer.configure(sensorType, new ParserWriterConfiguration(configurations));
- if(writer instanceof BulkMessageWriter) {
- parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, (BulkMessageWriter<JSONObject>)writer);
- }
- else if(writer instanceof MessageWriter) {
- parserBolt = new ParserBolt(zookeeperUrl, sensorType, parser, (MessageWriter<JSONObject>)writer);
- }
- else {
- throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter or a BulkMessageWriter");
- }
- }
- }
+
+ ParserBolt parserBolt = new ParserBolt(zookeeperUrl
+ , sensorType
+ , parser
+ ,getHandler( sensorType
+ , configurations
+ , sensorParserConfig.getWriterClassName() == null
+ ? new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC)
+ :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
+ )
+ );
+
+ WriterBolt errorBolt = new WriterBolt(getHandler( sensorType
+ , configurations
+ , sensorParserConfig.getErrorWriterClassName() == null
+ ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC)
+ .withConfigPrefix("error")
+ :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
+ )
+ , configurations
+ , sensorType
+ );
+ WriterBolt invalidBolt = new WriterBolt(getHandler( sensorType
+ , configurations
+ , sensorParserConfig.getErrorWriterClassName() == null
+ ? new KafkaWriter(brokerUrl).withTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC)
+ .withConfigPrefix("invalid")
+ :ReflectionUtils.createInstance(sensorParserConfig.getWriterClassName())
+ )
+ , configurations
+ , sensorType
+ );
+
builder.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks)
.shuffleGrouping("kafkaSpout");
+ if(errorWriterNumTasks > 0) {
+ builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
+ .setNumTasks(errorWriterNumTasks)
+ .shuffleGrouping("parserBolt", Constants.ERROR_STREAM)
+ ;
+ }
+ if(invalidWriterNumTasks > 0) {
+ builder.setBolt("invalidMessageWriter", invalidBolt, invalidWriterParallelism)
+ .setNumTasks(invalidWriterNumTasks)
+ .shuffleGrouping("parserBolt", Constants.INVALID_STREAM)
+ ;
+ }
return builder;
}
+ private static WriterHandler getHandler(String sensorType, ParserConfigurations configurations, AbstractWriter writer) {
+ writer.configure(sensorType, new ParserWriterConfiguration(configurations));
+ if(writer instanceof BulkMessageWriter) {
+ return new WriterHandler((BulkMessageWriter<JSONObject>)writer);
+ }
+ else if(writer instanceof MessageWriter) {
+ return new WriterHandler((MessageWriter<JSONObject>)writer);
+ }
+ else {
+ throw new IllegalStateException("Unable to create parser bolt: writer must be a MessageWriter or a BulkMessageWriter");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index eb7f021..0578fa5 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -67,9 +67,23 @@ public class ParserTopologyCLI {
o.setType(Number.class);
return o;
}),
- PARSER_PARALLISM("pp", code -> {
+ PARSER_PARALLELISM("pp", code -> {
Option o = new Option(code, "parser_p", true, "Parser Parallelism Hint");
- o.setArgName("PARSER_PARALLELISM_HINT");
+ o.setArgName("PARALLELISM_HINT");
+ o.setRequired(false);
+ o.setType(Number.class);
+ return o;
+ }),
+ INVALID_WRITER_PARALLELISM("iwp", code -> {
+ Option o = new Option(code, "invalid_writer_p", true, "Invalid Message Writer Parallelism Hint");
+ o.setArgName("PARALLELISM_HINT");
+ o.setRequired(false);
+ o.setType(Number.class);
+ return o;
+ }),
+ ERROR_WRITER_PARALLELISM("ewp", code -> {
+ Option o = new Option(code, "error_writer_p", true, "Error Writer Parallelism Hint");
+ o.setArgName("PARALLELISM_HINT");
o.setRequired(false);
o.setType(Number.class);
return o;
@@ -83,7 +97,21 @@ public class ParserTopologyCLI {
}),
PARSER_NUM_TASKS("pnt", code -> {
Option o = new Option(code, "parser_num_tasks", true, "Parser Num Tasks");
- o.setArgName("PARSER_NUM_TASKS");
+ o.setArgName("NUM_TASKS");
+ o.setRequired(false);
+ o.setType(Number.class);
+ return o;
+ }),
+ INVALID_WRITER_NUM_TASKS("iwnt", code -> {
+ Option o = new Option(code, "invalid_writer_num_tasks", true, "Invalid Writer Num Tasks");
+ o.setArgName("NUM_TASKS");
+ o.setRequired(false);
+ o.setType(Number.class);
+ return o;
+ }),
+ ERROR_WRITER_NUM_TASKS("ewnt", code -> {
+ Option o = new Option(code, "error_writer_num_tasks", true, "Error Writer Num Tasks");
+ o.setArgName("NUM_TASKS");
o.setRequired(false);
o.setType(Number.class);
return o;
@@ -226,8 +254,12 @@ public class ParserTopologyCLI {
String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
- int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLISM.get(cmd, "1"));
+ int parserParallelism = Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
int parserNumTasks= Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
+ int errorParallelism = Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
+ int errorNumTasks= Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
+ int invalidParallelism = Integer.parseInt(ParserOptions.INVALID_WRITER_PARALLELISM.get(cmd, "1"));
+ int invalidNumTasks= Integer.parseInt(ParserOptions.INVALID_WRITER_NUM_TASKS.get(cmd, "1"));
SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF;
TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
brokerUrl,
@@ -236,7 +268,12 @@ public class ParserTopologyCLI {
spoutParallelism,
spoutNumTasks,
parserParallelism,
- parserNumTasks);
+ parserNumTasks,
+ invalidParallelism,
+ invalidNumTasks,
+ errorParallelism,
+ errorNumTasks
+ );
Config stormConf = ParserOptions.getConfig(cmd);
if (ParserOptions.TEST.has(cmd)) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
index 6262dc1..f8578c8 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/writer/KafkaWriter.java
@@ -25,12 +25,14 @@ import org.apache.metron.common.configuration.Configurations;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.interfaces.MessageWriter;
import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.StringUtils;
import org.apache.metron.common.writer.AbstractWriter;
import org.json.simple.JSONObject;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObject>, Serializable {
public enum Configurations {
@@ -44,11 +46,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
Configurations(String key) {
this.key = key;
}
- public Object get(Map<String, Object> config) {
- return config.get(key);
+ public Object get(Optional<String> configPrefix, Map<String, Object> config) {
+ return config.get(StringUtils.join(".", configPrefix, Optional.of(key)));
}
- public <T> T getAndConvert(Map<String, Object> config, Class<T> clazz) {
- Object o = get(config);
+ public <T> T getAndConvert(Optional<String> configPrefix, Map<String, Object> config, Class<T> clazz) {
+ Object o = get(configPrefix, config);
if(o != null) {
return ConversionUtils.convert(o, clazz);
}
@@ -61,11 +63,10 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
private int requiredAcks = 1;
private String kafkaTopic = Constants.ENRICHMENT_TOPIC;
private KafkaProducer kafkaProducer;
+ private String configPrefix = null;
public KafkaWriter() {}
-
-
public KafkaWriter(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
@@ -89,26 +90,34 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
this.kafkaTopic= topic;
return this;
}
+ public KafkaWriter withConfigPrefix(String prefix) {
+ this.configPrefix = prefix;
+ return this;
+ }
+
+ public Optional<String> getConfigPrefix() {
+ return Optional.ofNullable(configPrefix);
+ }
@Override
public void configure(String sensorName, WriterConfiguration configuration) {
Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
- String brokerUrl = Configurations.BROKER.getAndConvert(configMap, String.class);
+ String brokerUrl = Configurations.BROKER.getAndConvert(getConfigPrefix(), configMap, String.class);
if(brokerUrl != null) {
this.brokerUrl = brokerUrl;
}
- String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(configMap, String.class);
+ String keySerializer = Configurations.KEY_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
if(keySerializer != null) {
withKeySerializer(keySerializer);
}
- String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(configMap, String.class);
+ String valueSerializer = Configurations.VALUE_SERIALIZER.getAndConvert(getConfigPrefix(), configMap, String.class);
if(valueSerializer != null) {
withValueSerializer(keySerializer);
}
- Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(configMap, Integer.class);
+ Integer requiredAcks = Configurations.REQUIRED_ACKS.getAndConvert(getConfigPrefix(), configMap, Integer.class);
if(requiredAcks!= null) {
withRequiredAcks(requiredAcks);
}
- String topic = Configurations.TOPIC.getAndConvert(configMap, String.class);
+ String topic = Configurations.TOPIC.getAndConvert(getConfigPrefix(), configMap, String.class);
if(topic != null) {
withTopic(topic);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 71f8fcf..0b07bc3 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -31,6 +31,7 @@ import org.adrianwalker.multilinestring.Multiline;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.metron.common.configuration.Configurations;
import org.apache.metron.parsers.interfaces.MessageFilter;
@@ -39,6 +40,7 @@ import org.apache.metron.common.interfaces.MessageWriter;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -81,10 +83,11 @@ public class ParserBoltTest extends BaseBoltTest {
@Mock
private Tuple t5;
+
@Test
public void testEmpty() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
@Override
protected ParserConfigurations defaultConfigurations() {
return new ParserConfigurations() {
@@ -119,8 +122,9 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void test() throws Exception {
+
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
@Override
protected ParserConfigurations defaultConfigurations() {
return new ParserConfigurations() {
@@ -177,7 +181,7 @@ public void testImplicitBatchOfOne() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected ParserConfigurations defaultConfigurations() {
return new ParserConfigurations() {
@@ -220,7 +224,8 @@ public void testImplicitBatchOfOne() throws Exception {
@Test
public void testFilter() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected SensorParserConfig getSensorParserConfig() {
try {
@@ -246,7 +251,7 @@ public void testImplicitBatchOfOne() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected ParserConfigurations defaultConfigurations() {
return new ParserConfigurations() {
@@ -281,7 +286,7 @@ public void testImplicitBatchOfOne() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected ParserConfigurations defaultConfigurations() {
return new ParserConfigurations() {
@@ -325,7 +330,7 @@ public void testImplicitBatchOfOne() throws Exception {
public void testBatchOfFiveWithError() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, batchWriter) {
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@Override
protected ParserConfigurations defaultConfigurations() {
return new ParserConfigurations() {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
new file mode 100644
index 0000000..fe8c238
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.bolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+import org.apache.metron.common.interfaces.BulkMessageWriter;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.test.bolt.BaseBoltTest;
+import org.apache.metron.common.configuration.Configurations;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.common.interfaces.MessageWriter;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class WriterBoltTest extends BaseBoltTest{
+ @Mock
+ protected TopologyContext topologyContext;
+
+ @Mock
+ protected OutputCollector outputCollector;
+
+ @Mock
+ private MessageWriter<JSONObject> writer;
+
+ @Mock
+ private BulkMessageWriter<JSONObject> batchWriter;
+
+ private ParserConfigurations getConfigurations(int batchSize) {
+ return new ParserConfigurations() {
+ @Override
+ public SensorParserConfig getSensorParserConfig(String sensorType) {
+ return new SensorParserConfig() {
+ @Override
+ public Map<String, Object> getParserConfig() {
+ return new HashMap<String, Object>() {{
+ put(ParserWriterConfiguration.BATCH_CONF, batchSize);
+ }};
+ }
+ };
+ }
+ };
+ }
+ @Test
+ public void testBatchHappyPath() throws Exception {
+ ParserConfigurations configurations = getConfigurations(5);
+ String sensorType = "test";
+ List<Tuple> tuples = new ArrayList<>();
+ for(int i = 0;i < 5;++i) {
+ Tuple t = mock(Tuple.class);
+ when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+ tuples.add(t);
+ }
+ WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+ bolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(batchWriter, times(1)).init(any(), any());
+ for(int i = 0;i < 4;++i) {
+ Tuple t = tuples.get(i);
+ bolt.execute(t);
+ verify(outputCollector, times(0)).ack(t);
+ verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+ }
+ bolt.execute(tuples.get(4));
+ for(Tuple t : tuples) {
+ verify(outputCollector, times(1)).ack(t);
+ }
+ verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+ verify(outputCollector, times(0)).reportError(any());
+ verify(outputCollector, times(0)).fail(any());
+ }
+ @Test
+ public void testNonBatchHappyPath() throws Exception {
+ ParserConfigurations configurations = getConfigurations(1);
+ String sensorType = "test";
+ Tuple t = mock(Tuple.class);
+ when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+ WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
+ bolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(writer, times(1)).init();
+ bolt.execute(t);
+ verify(outputCollector, times(1)).ack(t);
+ verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
+ verify(outputCollector, times(0)).reportError(any());
+ verify(outputCollector, times(0)).fail(any());
+ }
+ @Test
+ public void testNonBatchErrorPath() throws Exception {
+ ParserConfigurations configurations = getConfigurations(1);
+ String sensorType = "test";
+ Tuple t = mock(Tuple.class);
+ when(t.getValueByField(eq("message"))).thenThrow(new IllegalStateException());
+ WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
+ bolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(writer, times(1)).init();
+ bolt.execute(t);
+ verify(outputCollector, times(1)).ack(t);
+ verify(writer, times(0)).write(eq(sensorType), any(), any(), any());
+ verify(outputCollector, times(1)).reportError(any());
+ verify(outputCollector, times(0)).fail(any());
+ }
+ @Test
+ public void testNonBatchErrorPathErrorInWrite() throws Exception {
+ ParserConfigurations configurations = getConfigurations(1);
+ String sensorType = "test";
+ Tuple t = mock(Tuple.class);
+ when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+ WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
+ bolt.prepare(new HashMap(), topologyContext, outputCollector);
+ doThrow(new Exception()).when(writer).write(any(), any(), any(), any());
+ verify(writer, times(1)).init();
+ bolt.execute(t);
+ verify(outputCollector, times(1)).ack(t);
+ verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
+ verify(outputCollector, times(1)).reportError(any());
+ verify(outputCollector, times(0)).fail(any());
+ }
+ @Test
+ public void testBatchErrorPath() throws Exception {
+ ParserConfigurations configurations = getConfigurations(5);
+ String sensorType = "test";
+ List<Tuple> tuples = new ArrayList<>();
+ for(int i = 0;i < 4;++i) {
+ Tuple t = mock(Tuple.class);
+ when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+ tuples.add(t);
+ }
+ Tuple errorTuple = mock(Tuple.class);
+ Tuple goodTuple = mock(Tuple.class);
+ when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
+ when(errorTuple.getValueByField(eq("message"))).thenThrow(new IllegalStateException());
+
+ WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+ bolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(batchWriter, times(1)).init(any(), any());
+ for(int i = 0;i < 4;++i) {
+ Tuple t = tuples.get(i);
+ bolt.execute(t);
+ verify(outputCollector, times(0)).ack(t);
+ verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+ }
+ bolt.execute(errorTuple);
+ for(Tuple t : tuples) {
+ verify(outputCollector, times(0)).ack(t);
+ }
+ bolt.execute(goodTuple);
+ for(Tuple t : tuples) {
+ verify(outputCollector, times(1)).ack(t);
+ }
+ verify(outputCollector, times(1)).ack(goodTuple);
+ verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+ verify(outputCollector, times(1)).reportError(any());
+ verify(outputCollector, times(0)).fail(any());
+ }
+
+ @Test
+ public void testBatchErrorPathExceptionInWrite() throws Exception {
+ ParserConfigurations configurations = getConfigurations(5);
+ String sensorType = "test";
+ List<Tuple> tuples = new ArrayList<>();
+ for(int i = 0;i < 4;++i) {
+ Tuple t = mock(Tuple.class);
+ when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+ tuples.add(t);
+ }
+ Tuple goodTuple = mock(Tuple.class);
+ when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
+
+ WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+ bolt.prepare(new HashMap(), topologyContext, outputCollector);
+ doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
+ verify(batchWriter, times(1)).init(any(), any());
+ for(int i = 0;i < 4;++i) {
+ Tuple t = tuples.get(i);
+ bolt.execute(t);
+ verify(outputCollector, times(0)).ack(t);
+ verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+ }
+ bolt.execute(goodTuple);
+ for(Tuple t : tuples) {
+ verify(outputCollector, times(1)).ack(t);
+ }
+ verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+ verify(outputCollector, times(1)).ack(goodTuple);
+ verify(outputCollector, times(1)).reportError(any());
+ verify(outputCollector, times(0)).fail(any());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
index 5f314fa..e667e54 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/csv/CSVParserTest.java
@@ -95,5 +95,13 @@ public class CSVParserTest {
Assert.assertEquals(" bar", o.get("col2"));
Assert.assertEquals(" grok", o.get("col3"));
}
+ {
+ String line = "foo";
+ try {
+ List<JSONObject> results = parser.parse(Bytes.toBytes(line));
+ Assert.fail("Expected exception");
+ }
+ catch(IllegalStateException iae) {}
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 0cc74f5..ad00deb 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -75,6 +75,10 @@ public class ParserTopologyComponent implements InMemoryComponent {
, 1
, 1
, 1
+ , 1
+ , 1
+ , 1
+ , 1
);
Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_DEBUG, true);
@@ -87,6 +91,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
@Override
public void stop() {
- stormCluster.shutdown();
+ if(stormCluster != null) {
+ stormCluster.shutdown();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/916432c9/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
new file mode 100644
index 0000000..a6e83f1
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writers.integration;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.TestConstants;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.FieldValidator;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.field.validation.FieldValidation;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.converter.EnrichmentConverter;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.integration.*;
+import org.apache.metron.integration.components.ConfigUploadComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.mock.MockTableProvider;
+import org.apache.metron.integration.utils.TestUtils;
+import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
+import org.apache.metron.test.TestDataType;
+import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.test.utils.SampleDataUtils;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.*;
+
+public class WriterBoltIntegrationTest extends BaseIntegrationTest {
+ public static class MockValidator implements FieldValidation{
+
+ @Override
+ public boolean isValid(Map<String, Object> input, Map<String, Object> validationConfig, Map<String, Object> globalConfig) {
+ if(input.get("action").equals("invalid")) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void initialize(Map<String, Object> validationConfig, Map<String, Object> globalConfig) {
+ }
+ }
+ /**
+ {
+ "fieldValidations" : [
+ {
+ "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"
+ }
+ ]
+ }
+ */
+ @Multiline
+ public static String globalConfig;
+
+ /**
+ {
+ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
+ ,"sensorTopic":"dummy"
+ ,"parserConfig":
+ {
+ "columns" : {
+ "action" : 0
+ ,"dummy" : 1
+ }
+ }
+ }
+ */
+ @Multiline
+ public static String parserConfig;
+
+ @Test
+ public void test() throws UnableToStartException, IOException {
+ final String sensorType = "dummy";
+ final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+ add(Bytes.toBytes("valid,foo"));
+ add(Bytes.toBytes("invalid,foo"));
+ add(Bytes.toBytes("error"));
+ }};
+ final Properties topologyProperties = new Properties();
+ final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{
+ add(new KafkaWithZKComponent.Topic(sensorType, 1));
+ add(new KafkaWithZKComponent.Topic(Constants.DEFAULT_PARSER_ERROR_TOPIC, 1));
+ add(new KafkaWithZKComponent.Topic(Constants.DEFAULT_PARSER_INVALID_TOPIC, 1));
+ add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+ }});
+ topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+
+ ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
+ .withTopologyProperties(topologyProperties)
+ .withGlobalConfig(globalConfig)
+ .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+
+ ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
+ .withSensorType(sensorType)
+ .withTopologyProperties(topologyProperties)
+ .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+
+ UnitTestHelper.verboseLogging();
+ ComponentRunner runner = new ComponentRunner.Builder()
+ .withComponent("kafka", kafkaComponent)
+ .withComponent("config", configUploadComponent)
+ .withComponent("storm", parserTopologyComponent)
+ .withMillisecondsBetweenAttempts(5000)
+ .withNumRetries(10)
+ .build();
+ try {
+ runner.start();
+ kafkaComponent.writeMessages(sensorType, inputMessages);
+ Map<String, List<JSONObject>> outputMessages =
+ runner.process(new Processor<Map<String, List<JSONObject>>>() {
+ Map<String, List<JSONObject>> messages = null;
+
+ public ReadinessState process(ComponentRunner runner) {
+ KafkaWithZKComponent kafkaWithZKComponent = runner.getComponent("kafka", KafkaWithZKComponent.class);
+ List<byte[]> outputMessages = kafkaWithZKComponent.readMessages(Constants.ENRICHMENT_TOPIC);
+ List<byte[]> invalid = kafkaWithZKComponent.readMessages(Constants.DEFAULT_PARSER_INVALID_TOPIC);
+ List<byte[]> error = kafkaWithZKComponent.readMessages(Constants.DEFAULT_PARSER_ERROR_TOPIC);
+ if(outputMessages.size() == 1 && invalid.size() == 1 && error.size() == 1) {
+ messages = new HashMap<String, List<JSONObject>>() {{
+ put(Constants.ENRICHMENT_TOPIC, loadMessages(outputMessages));
+ put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(error));
+ put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(invalid));
+ }};
+ return ReadinessState.READY;
+ }
+ return ReadinessState.NOT_READY;
+ }
+
+ public Map<String, List<JSONObject>> getResult() {
+ return messages;
+ }
+ });
+ Assert.assertEquals(3, outputMessages.size());
+ Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
+ Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action"));
+ Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).size());
+ Assert.assertEquals("error", outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage"));
+ Assert.assertTrue(Arrays.equals(listToBytes(outputMessages.get(Constants.DEFAULT_PARSER_ERROR_TOPIC).get(0).get("rawMessage_bytes"))
+ , "error".getBytes()
+ )
+ );
+ Assert.assertEquals(1, outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).size());
+ Assert.assertEquals("invalid", outputMessages.get(Constants.DEFAULT_PARSER_INVALID_TOPIC).get(0).get("action"));
+ }
+ finally {
+ if(runner != null) {
+ runner.stop();
+ }
+ }
+ }
+ private static byte[] listToBytes(Object o ){
+ List<Byte> l = (List<Byte>)o;
+ byte[] ret = new byte[l.size()];
+ int i = 0;
+ for(Number b : l) {
+ ret[i++] = b.byteValue();
+ }
+ return ret;
+ }
+ private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
+ List<JSONObject> tmp = new ArrayList<>();
+ Iterables.addAll(tmp
+ , Iterables.transform(outputMessages
+ , message -> {
+ try {
+ return new JSONObject(JSONUtils.INSTANCE.load(new String(message)
+ , new TypeReference<Map<String, Object>>() {}
+ )
+ );
+ } catch (Exception ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+ )
+ );
+ return tmp;
+ }
+}