You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/05/15 12:59:11 UTC
[43/50] [abbrv] metron git commit: METRON-1543 Unable to Set Parser
Output Topic in Sensor Config (nickwallen) closes apache/metron#1007
METRON-1543 Unable to Set Parser Output Topic in Sensor Config (nickwallen) closes apache/metron#1007
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bb926df
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bb926df
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bb926df
Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 3bb926df5d253a907bbf8dab4b76b78dd32993ea
Parents: 2b4f0b8
Author: nickwallen <ni...@nickallen.org>
Authored: Wed May 2 15:06:03 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed May 2 15:06:03 2018 -0400
----------------------------------------------------------------------
.../org/apache/metron/common/Constants.java | 10 +-
.../configuration/SensorParserConfig.java | 410 +++++++++++--------
.../parsers/topology/ParserTopologyBuilder.java | 139 ++++---
.../parsers/topology/ParserTopologyCLI.java | 147 +++++--
.../components/ParserTopologyComponent.java | 80 ++--
.../parsers/topology/ParserTopologyCLITest.java | 122 ++++--
...pleHbaseEnrichmentWriterIntegrationTest.java | 69 ++--
.../integration/WriterBoltIntegrationTest.java | 109 ++---
.../apache/metron/writer/kafka/KafkaWriter.java | 5 +
9 files changed, 676 insertions(+), 415 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index b939a92..12b541c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -17,9 +17,7 @@
*/
package org.apache.metron.common;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class Constants {
@@ -37,9 +35,17 @@ public class Constants {
public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
public static final String GUID = "guid";
+ /**
+ * The key in the global configuration that defines the global parser error topic.
+ *
+ * <p>This value is used only if the error topic is left undefined in a sensor's parser configuration.
+ */
+ public static final String PARSER_ERROR_TOPIC_GLOBALS_KEY = "parser.error.topic";
+
public interface Field {
String getName();
}
+
public enum Fields implements Field {
SRC_ADDR("ip_src_addr")
,SRC_PORT("ip_src_port")
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index d347481..1dfb045 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -18,6 +18,9 @@
package org.apache.metron.common.configuration;
import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.metron.common.utils.JSONUtils;
import java.io.IOException;
@@ -27,35 +30,171 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+/**
+ * The configuration object that defines a parser for a given sensor. Each
+ * sensor has its own parser configuration.
+ */
public class SensorParserConfig implements Serializable {
+ /**
+ * The class name of the parser.
+ */
private String parserClassName;
+
+ /**
+ * Allows logic to be defined to filter or ignore messages. Messages that have been
+ * filtered will not be parsed.
+ *
+ * This should be a fully qualified name of a class that implements the
+ * org.apache.metron.parsers.interfaces.MessageFilter interface.
+ */
private String filterClassName;
+
+ /**
+ * The input topic containing the sensor telemetry to parse.
+ */
private String sensorTopic;
+
+ /**
+ * The output topic where the parsed telemetry will be written.
+ */
+ private String outputTopic;
+
+ /**
+ * The error topic where errors are written to.
+ */
+ private String errorTopic;
+
+ /**
+ * The fully qualified name of a class used to write messages
+ * to the output topic.
+ *
+ * <p>A sensible default is provided.
+ */
private String writerClassName;
+
+ /**
+ * The fully qualified name of a class used to write messages
+ * to the error topic.
+ *
+ * <p>A sensible default is provided.
+ */
private String errorWriterClassName;
- private String invalidWriterClassName;
+
+ /**
+ * Determines if parser metadata is made available to the parser's field
+ * transformations. If true, the parser field transformations can access
+ * parser metadata values.
+ *
+ * <p>By default, this is false and parser metadata is not available
+ * to the field transformations.
+ */
private Boolean readMetadata = false;
+
+ /**
+ * Determines if parser metadata is automatically merged into the message. If
+ * true, parser metadata values will appear as fields within the message.
+ *
+ * <p>By default, this is false and metadata is not merged.
+ */
private Boolean mergeMetadata = false;
+
+ /**
+ * The number of workers for the topology.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer numWorkers = null;
+
+ /**
+ * The number of ackers for the topology.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer numAckers= null;
+
+ /**
+ * The parallelism of the Kafka spout.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer spoutParallelism = 1;
+
+ /**
+ * The number of tasks for the Kafka spout.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer spoutNumTasks = 1;
+
+ /**
+ * The parallelism of the parser bolt.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer parserParallelism = 1;
+
+ /**
+ * The number of tasks for the parser bolt.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer parserNumTasks = 1;
+
+ /**
+ * The parallelism of the error writer bolt.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer errorWriterParallelism = 1;
+
+ /**
+ * The number of tasks for the error writer bolt.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Integer errorWriterNumTasks = 1;
- private Map<String, Object> cacheConfig = new HashMap<>();
+
+ /**
+ * Configuration properties passed to the Kafka spout.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Map<String, Object> spoutConfig = new HashMap<>();
+
+ /**
+ * The Kafka security protocol.
+ *
+ * <p>This property can be overridden on the CLI. This property can also be overridden by the spout config.
+ */
private String securityProtocol = null;
+
+ /**
+ * Configuration properties passed to the storm topology.
+ *
+ * <p>This property can be overridden on the CLI.
+ */
private Map<String, Object> stormConfig = new HashMap<>();
/**
- * Cache config for stellar field transformations.
- * * stellar.cache.maxSize - The maximum number of elements in the cache.
- * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
- * @return
+ * Configuration for the parser.
+ */
+ private Map<String, Object> parserConfig = new HashMap<>();
+
+ /**
+ * The field transformations applied to the parsed messages. These allow fields
+ * of the parsed message to be transformed.
*/
+ private List<FieldTransformer> fieldTransformations = new ArrayList<>();
+
+ /**
+ * Configures the cache that backs stellar field transformations.
+ *
+ * <li>stellar.cache.maxSize - The maximum number of elements in the cache.
+ * <li>stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
+ */
+ private Map<String, Object> cacheConfig = new HashMap<>();
+
public Map<String, Object> getCacheConfig() {
return cacheConfig;
}
@@ -64,10 +203,6 @@ public class SensorParserConfig implements Serializable {
this.cacheConfig = cacheConfig;
}
- /**
- * Return the number of workers for the topology. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getNumWorkers() {
return numWorkers;
}
@@ -76,10 +211,6 @@ public class SensorParserConfig implements Serializable {
this.numWorkers = numWorkers;
}
- /**
- * Return the number of ackers for the topology. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getNumAckers() {
return numAckers;
}
@@ -88,10 +219,6 @@ public class SensorParserConfig implements Serializable {
this.numAckers = numAckers;
}
- /**
- * Return the spout parallelism. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getSpoutParallelism() {
return spoutParallelism;
}
@@ -100,10 +227,6 @@ public class SensorParserConfig implements Serializable {
this.spoutParallelism = spoutParallelism;
}
- /**
- * Return the spout num tasks. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getSpoutNumTasks() {
return spoutNumTasks;
}
@@ -112,10 +235,6 @@ public class SensorParserConfig implements Serializable {
this.spoutNumTasks = spoutNumTasks;
}
- /**
- * Return the parser parallelism. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getParserParallelism() {
return parserParallelism;
}
@@ -124,10 +243,6 @@ public class SensorParserConfig implements Serializable {
this.parserParallelism = parserParallelism;
}
- /**
- * Return the parser number of tasks. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getParserNumTasks() {
return parserNumTasks;
}
@@ -136,10 +251,6 @@ public class SensorParserConfig implements Serializable {
this.parserNumTasks = parserNumTasks;
}
- /**
- * Return the error writer bolt parallelism. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getErrorWriterParallelism() {
return errorWriterParallelism;
}
@@ -148,10 +259,6 @@ public class SensorParserConfig implements Serializable {
this.errorWriterParallelism = errorWriterParallelism;
}
- /**
- * Return the error writer bolt number of tasks. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Integer getErrorWriterNumTasks() {
return errorWriterNumTasks;
}
@@ -160,10 +267,6 @@ public class SensorParserConfig implements Serializable {
this.errorWriterNumTasks = errorWriterNumTasks;
}
- /**
- * Return the spout config. This includes kafka properties. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Map<String, Object> getSpoutConfig() {
return spoutConfig;
}
@@ -172,11 +275,6 @@ public class SensorParserConfig implements Serializable {
this.spoutConfig = spoutConfig;
}
- /**
- * Return security protocol to use. This property will be used for the parser unless overridden on the CLI.
- * The order of precedence is CLI > spout config > config in the sensor parser config.
- * @return
- */
public String getSecurityProtocol() {
return securityProtocol;
}
@@ -185,10 +283,6 @@ public class SensorParserConfig implements Serializable {
this.securityProtocol = securityProtocol;
}
- /**
- * Return Storm topologyconfig. This property will be used for the parser unless overridden on the CLI.
- * @return
- */
public Map<String, Object> getStormConfig() {
return stormConfig;
}
@@ -197,10 +291,6 @@ public class SensorParserConfig implements Serializable {
this.stormConfig = stormConfig;
}
- /**
- * Return whether or not to merge metadata sent into the message. If true, then metadata become proper fields.
- * @return
- */
public Boolean getMergeMetadata() {
return mergeMetadata;
}
@@ -209,10 +299,6 @@ public class SensorParserConfig implements Serializable {
this.mergeMetadata = mergeMetadata;
}
- /**
- * Return whether or not to read metadata at all.
- * @return
- */
public Boolean getReadMetadata() {
return readMetadata;
}
@@ -229,22 +315,13 @@ public class SensorParserConfig implements Serializable {
this.errorWriterClassName = errorWriterClassName;
}
- public String getInvalidWriterClassName() {
- return invalidWriterClassName;
- }
-
- public void setInvalidWriterClassName(String invalidWriterClassName) {
- this.invalidWriterClassName = invalidWriterClassName;
- }
-
public String getWriterClassName() {
return writerClassName;
}
+
public void setWriterClassName(String classNames) {
this.writerClassName = classNames;
}
- private Map<String, Object> parserConfig = new HashMap<>();
- private List<FieldTransformer> fieldTransformations = new ArrayList<>();
public List<FieldTransformer> getFieldTransformations() {
return fieldTransformations;
@@ -278,6 +355,22 @@ public class SensorParserConfig implements Serializable {
this.sensorTopic = sensorTopic;
}
+ public String getOutputTopic() {
+ return outputTopic;
+ }
+
+ public void setOutputTopic(String outputTopic) {
+ this.outputTopic = outputTopic;
+ }
+
+ public String getErrorTopic() {
+ return errorTopic;
+ }
+
+ public void setErrorTopic(String errorTopic) {
+ this.errorTopic = errorTopic;
+ }
+
public Map<String, Object> getParserConfig() {
return parserConfig;
}
@@ -298,112 +391,103 @@ public class SensorParserConfig implements Serializable {
}
}
-
public String toJSON() throws JsonProcessingException {
return JSONUtils.INSTANCE.toJSON(this, true);
}
@Override
- public String toString() {
- return "SensorParserConfig{" +
- "parserClassName='" + parserClassName + '\'' +
- ", filterClassName='" + filterClassName + '\'' +
- ", sensorTopic='" + sensorTopic + '\'' +
- ", writerClassName='" + writerClassName + '\'' +
- ", errorWriterClassName='" + errorWriterClassName + '\'' +
- ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
- ", readMetadata=" + readMetadata +
- ", mergeMetadata=" + mergeMetadata +
- ", numWorkers=" + numWorkers +
- ", numAckers=" + numAckers +
- ", spoutParallelism=" + spoutParallelism +
- ", spoutNumTasks=" + spoutNumTasks +
- ", parserParallelism=" + parserParallelism +
- ", parserNumTasks=" + parserNumTasks +
- ", errorWriterParallelism=" + errorWriterParallelism +
- ", errorWriterNumTasks=" + errorWriterNumTasks +
- ", spoutConfig=" + spoutConfig +
- ", securityProtocol='" + securityProtocol + '\'' +
- ", stormConfig=" + stormConfig +
- ", parserConfig=" + parserConfig +
- ", fieldTransformations=" + fieldTransformations +
- '}';
- }
-
- @Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- SensorParserConfig that = (SensorParserConfig) o;
+ if (this == o) {
+ return true;
+ }
- if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null)
- return false;
- if (getFilterClassName() != null ? !getFilterClassName().equals(that.getFilterClassName()) : that.getFilterClassName() != null)
- return false;
- if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
- return false;
- if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
- return false;
- if (getErrorWriterClassName() != null ? !getErrorWriterClassName().equals(that.getErrorWriterClassName()) : that.getErrorWriterClassName() != null)
- return false;
- if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null)
- return false;
- if (getReadMetadata() != null ? !getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != null)
- return false;
- if (getMergeMetadata() != null ? !getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() != null)
+ if (o == null || getClass() != o.getClass()) {
return false;
- if (getNumWorkers() != null ? !getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null)
- return false;
- if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : that.getNumAckers() != null)
- return false;
- if (getSpoutParallelism() != null ? !getSpoutParallelism().equals(that.getSpoutParallelism()) : that.getSpoutParallelism() != null)
- return false;
- if (getSpoutNumTasks() != null ? !getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() != null)
- return false;
- if (getParserParallelism() != null ? !getParserParallelism().equals(that.getParserParallelism()) : that.getParserParallelism() != null)
- return false;
- if (getParserNumTasks() != null ? !getParserNumTasks().equals(that.getParserNumTasks()) : that.getParserNumTasks() != null)
- return false;
- if (getErrorWriterParallelism() != null ? !getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : that.getErrorWriterParallelism() != null)
- return false;
- if (getErrorWriterNumTasks() != null ? !getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : that.getErrorWriterNumTasks() != null)
- return false;
- if (getSpoutConfig() != null ? !getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null)
- return false;
- if (getSecurityProtocol() != null ? !getSecurityProtocol().equals(that.getSecurityProtocol()) : that.getSecurityProtocol() != null)
- return false;
- if (getStormConfig() != null ? !getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null)
- return false;
- if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
- return false;
- return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null;
+ }
+ SensorParserConfig that = (SensorParserConfig) o;
+ return new EqualsBuilder()
+ .append(parserClassName, that.parserClassName)
+ .append(filterClassName, that.filterClassName)
+ .append(sensorTopic, that.sensorTopic)
+ .append(outputTopic, that.outputTopic)
+ .append(errorTopic, that.errorTopic)
+ .append(writerClassName, that.writerClassName)
+ .append(errorWriterClassName, that.errorWriterClassName)
+ .append(readMetadata, that.readMetadata)
+ .append(mergeMetadata, that.mergeMetadata)
+ .append(numWorkers, that.numWorkers)
+ .append(numAckers, that.numAckers)
+ .append(spoutParallelism, that.spoutParallelism)
+ .append(spoutNumTasks, that.spoutNumTasks)
+ .append(parserParallelism, that.parserParallelism)
+ .append(parserNumTasks, that.parserNumTasks)
+ .append(errorWriterParallelism, that.errorWriterParallelism)
+ .append(errorWriterNumTasks, that.errorWriterNumTasks)
+ .append(spoutConfig, that.spoutConfig)
+ .append(securityProtocol, that.securityProtocol)
+ .append(stormConfig, that.stormConfig)
+ .append(cacheConfig, that.cacheConfig)
+ .append(parserConfig, that.parserConfig)
+ .append(fieldTransformations, that.fieldTransformations)
+ .isEquals();
}
@Override
public int hashCode() {
- int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
- result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
- result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
- result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
- result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0);
- result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0);
- result = 31 * result + (getReadMetadata() != null ? getReadMetadata().hashCode() : 0);
- result = 31 * result + (getMergeMetadata() != null ? getMergeMetadata().hashCode() : 0);
- result = 31 * result + (getNumWorkers() != null ? getNumWorkers().hashCode() : 0);
- result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() : 0);
- result = 31 * result + (getSpoutParallelism() != null ? getSpoutParallelism().hashCode() : 0);
- result = 31 * result + (getSpoutNumTasks() != null ? getSpoutNumTasks().hashCode() : 0);
- result = 31 * result + (getParserParallelism() != null ? getParserParallelism().hashCode() : 0);
- result = 31 * result + (getParserNumTasks() != null ? getParserNumTasks().hashCode() : 0);
- result = 31 * result + (getErrorWriterParallelism() != null ? getErrorWriterParallelism().hashCode() : 0);
- result = 31 * result + (getErrorWriterNumTasks() != null ? getErrorWriterNumTasks().hashCode() : 0);
- result = 31 * result + (getSpoutConfig() != null ? getSpoutConfig().hashCode() : 0);
- result = 31 * result + (getSecurityProtocol() != null ? getSecurityProtocol().hashCode() : 0);
- result = 31 * result + (getStormConfig() != null ? getStormConfig().hashCode() : 0);
- result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
- result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
- return result;
+ return new HashCodeBuilder(17, 37)
+ .append(parserClassName)
+ .append(filterClassName)
+ .append(sensorTopic)
+ .append(outputTopic)
+ .append(errorTopic)
+ .append(writerClassName)
+ .append(errorWriterClassName)
+ .append(readMetadata)
+ .append(mergeMetadata)
+ .append(numWorkers)
+ .append(numAckers)
+ .append(spoutParallelism)
+ .append(spoutNumTasks)
+ .append(parserParallelism)
+ .append(parserNumTasks)
+ .append(errorWriterParallelism)
+ .append(errorWriterNumTasks)
+ .append(spoutConfig)
+ .append(securityProtocol)
+ .append(stormConfig)
+ .append(cacheConfig)
+ .append(parserConfig)
+ .append(fieldTransformations)
+ .toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("parserClassName", parserClassName)
+ .append("filterClassName", filterClassName)
+ .append("sensorTopic", sensorTopic)
+ .append("outputTopic", outputTopic)
+ .append("errorTopic", errorTopic)
+ .append("writerClassName", writerClassName)
+ .append("errorWriterClassName", errorWriterClassName)
+ .append("readMetadata", readMetadata)
+ .append("mergeMetadata", mergeMetadata)
+ .append("numWorkers", numWorkers)
+ .append("numAckers", numAckers)
+ .append("spoutParallelism", spoutParallelism)
+ .append("spoutNumTasks", spoutNumTasks)
+ .append("parserParallelism", parserParallelism)
+ .append("parserNumTasks", parserNumTasks)
+ .append("errorWriterParallelism", errorWriterParallelism)
+ .append("errorWriterNumTasks", errorWriterNumTasks)
+ .append("spoutConfig", spoutConfig)
+ .append("securityProtocol", securityProtocol)
+ .append("stormConfig", stormConfig)
+ .append("cacheConfig", cacheConfig)
+ .append("parserConfig", parserConfig)
+ .append("fieldTransformations", fieldTransformations)
+ .toString();
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 2865dd6..cd4ad50 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -84,7 +84,7 @@ public class ParserTopologyBuilder {
* @param errorWriterNumTasksSupplier Supplier for the number of tasks for the bolt that handles errors
* @param kafkaSpoutConfigSupplier Supplier for the configuration options for the kafka spout
* @param securityProtocolSupplier Supplier for the security protocol
- * @param outputTopic The output kafka topic
+ * @param outputTopicSupplier Supplier for the output kafka topic
* @param stormConfigSupplier Supplier for the storm config
* @return A Storm topology that parses telemetry data received from an external sensor
* @throws Exception
@@ -100,7 +100,8 @@ public class ParserTopologyBuilder {
ValueSupplier<Integer> errorWriterNumTasksSupplier,
ValueSupplier<Map> kafkaSpoutConfigSupplier,
ValueSupplier<String> securityProtocolSupplier,
- Optional<String> outputTopic,
+ ValueSupplier<String> outputTopicSupplier,
+ ValueSupplier<String> errorTopicSupplier,
ValueSupplier<Config> stormConfigSupplier
) throws Exception {
@@ -113,24 +114,27 @@ public class ParserTopologyBuilder {
int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class);
int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class);
int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class);
+ String outputTopic = outputTopicSupplier.get(parserConfig, String.class);
+
Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
// create the spout
TopologyBuilder builder = new TopologyBuilder();
- KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
+ KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig), parserConfig);
builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
.setNumTasks(spoutNumTasks);
// create the parser bolt
- ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic);
+ ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, Optional.of(outputTopic));
builder.setBolt("parserBolt", parserBolt, parserParallelism)
.setNumTasks(parserNumTasks)
.localOrShuffleGrouping("kafkaSpout");
// create the error bolt, if needed
if (errorWriterNumTasks > 0) {
- WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
+ String errorTopic = errorTopicSupplier.get(parserConfig, String.class);
+ WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, errorTopic);
builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
.setNumTasks(errorWriterNumTasks)
.localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
@@ -176,24 +180,35 @@ public class ParserTopologyBuilder {
);
}
- private static KafkaWriter createKafkaWriter( Optional<String> broker
- , String zkQuorum
- , Optional<String> securityProtocol
- )
- {
- KafkaWriter ret = null;
+ /**
+ * Create a Kafka writer.
+ *
+ * @param broker An optional URL to the Kafka brokers.
+ * @param zkQuorum The URL to Zookeeper.
+ * @param securityProtocol An optional security protocol in use.
+ * @return
+ */
+ private static KafkaWriter createKafkaWriter(Optional<String> broker,
+ String zkQuorum,
+ Optional<String> securityProtocol) {
+ KafkaWriter writer = new KafkaWriter();
+
+ // cluster URL; either broker or zookeeper
if(broker.isPresent()) {
- ret = new KafkaWriter(broker.get());
- }
- else {
- ret = new KafkaWriter().withZkQuorum(zkQuorum);
+ writer.withBrokerUrl(broker.get());
+
+ } else {
+ writer.withZkQuorum(zkQuorum);
}
+
+ // security protocol
if(securityProtocol.isPresent()) {
HashMap<String, Object> config = new HashMap<>();
config.put("security.protocol", securityProtocol.get());
- ret.withProducerConfigs(config);
+ writer.withProducerConfigs(config);
}
- return ret;
+
+ return writer;
}
/**
@@ -206,27 +221,31 @@ public class ParserTopologyBuilder {
* @param parserConfig
* @return A Storm bolt that parses input from a sensor
*/
- private static ParserBolt createParserBolt( String zookeeperUrl
- , Optional<String> brokerUrl
- , String sensorType
- , Optional<String> securityProtocol
- , ParserConfigurations configs
- , SensorParserConfig parserConfig
- , Optional<String> outputTopic
- )
- {
+ private static ParserBolt createParserBolt( String zookeeperUrl,
+ Optional<String> brokerUrl,
+ String sensorType,
+ Optional<String> securityProtocol,
+ ParserConfigurations configs,
+ SensorParserConfig parserConfig,
+ Optional<String> outputTopic) {
// create message parser
MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
parser.configure(parserConfig.getParserConfig());
- // create writer - if not configured uses a sensible default
- AbstractWriter writer = parserConfig.getWriterClassName() == null ?
- createKafkaWriter( brokerUrl
- , zookeeperUrl
- , securityProtocol
- ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) :
- ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+ // create a writer
+ AbstractWriter writer;
+ if(parserConfig.getWriterClassName() == null) {
+
+ // if not configured, use a sensible default
+ writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+ .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
+
+ } else {
+ writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+ }
+
+ // configure it
writer.configure(sensorType, new ParserWriterConfiguration(configs));
// create a writer handler
@@ -238,37 +257,47 @@ public class ParserTopologyBuilder {
/**
* Create a bolt that handles error messages.
*
- * @param zookeeperUrl Kafka zookeeper URL
- * @param brokerUrl Kafka Broker URL
- * @param sensorType Type of sensor that is being consumed.
- * @param securityProtocol Security protocol used (if any)
+ * @param zookeeperUrl Kafka zookeeper URL
+ * @param brokerUrl Kafka Broker URL
+ * @param sensorType Type of sensor that is being consumed.
+ * @param securityProtocol Security protocol used (if any)
* @param configs
- * @param parserConfig
+ * @param parserConfig The sensor's parser configuration.
* @return A Storm bolt that handles error messages.
*/
- private static WriterBolt createErrorBolt( String zookeeperUrl
- , Optional<String> brokerUrl
- , String sensorType
- , Optional<String> securityProtocol
- , ParserConfigurations configs
- , SensorParserConfig parserConfig
- )
- {
+ private static WriterBolt createErrorBolt( String zookeeperUrl,
+ Optional<String> brokerUrl,
+ String sensorType,
+ Optional<String> securityProtocol,
+ ParserConfigurations configs,
+ SensorParserConfig parserConfig,
+ String errorTopic) {
+
+ // create a writer
+ AbstractWriter writer;
+ if (parserConfig.getErrorWriterClassName() == null) {
+
+ if(errorTopic == null) {
+ errorTopic = (String) configs.getGlobalConfig().get(Constants.PARSER_ERROR_TOPIC_GLOBALS_KEY);
+ }
+
+ // if not configured, uses a sensible default
+ writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+ .withTopic(errorTopic)
+ .withConfigPrefix("error");
+
+ } else {
+ writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+ }
- // create writer - if not configured uses a sensible default
- AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ?
- createKafkaWriter( brokerUrl
- , zookeeperUrl
- , securityProtocol
- ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic"))
- .withConfigPrefix("error")
- : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+ // configure it
writer.configure(sensorType, new ParserWriterConfiguration(configs));
// create a writer handler
WriterHandler writerHandler = createWriterHandler(writer);
- return new WriterBolt(writerHandler, configs, sensorType).withErrorType(Constants.ErrorType.PARSER_ERROR);
+ return new WriterBolt(writerHandler, configs, sensorType)
+ .withErrorType(Constants.ErrorType.PARSER_ERROR);
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 3824212..f60ff44 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -310,34 +310,40 @@ public class ParserTopologyCLI {
String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
/*
- It bears mentioning why we're creating this ValueSupplier indirection here.
- As a separation of responsibilities, the CLI class defines the order of precedence
- for the various topological and structural properties for creating a parser. This is
- desirable because there are now (i.e. integration tests)
- and may be in the future (i.e. a REST service to start parsers without using the CLI)
- other mechanisms to construct parser topologies. It's sensible to split those concerns..
-
- Unfortunately, determining the structural parameters for a parser requires interacting with
- external services (e.g. zookeeper) that are set up well within the ParserTopology class.
- Rather than pulling the infrastructure to interact with those services out and moving it into the
- CLI class and breaking that separation of concerns, we've created a supplier
- indirection where are providing the logic as to how to create precedence in the CLI class
- without owning the responsibility of constructing the infrastructure where the values are
- necessarily supplied.
-
+ * It bears mentioning why we're creating this ValueSupplier indirection here.
+ * As a separation of responsibilities, the CLI class defines the order of precedence
+ * for the various topological and structural properties for creating a parser. This is
+ * desirable because there are now (i.e. integration tests)
+ * and may be in the future (i.e. a REST service to start parsers without using the CLI)
+ * other mechanisms to construct parser topologies. It's sensible to split those concerns..
+ *
+ * Unfortunately, determining the structural parameters for a parser requires interacting with
+ * external services (e.g. zookeeper) that are set up well within the ParserTopology class.
+ * Rather than pulling the infrastructure to interact with those services out and moving it into the
+ * CLI class and breaking that separation of concerns, we've created a supplier
+ * indirection where are providing the logic as to how to create precedence in the CLI class
+ * without owning the responsibility of constructing the infrastructure where the values are
+ * necessarily supplied.
+ *
*/
+
+ // kafka spout parallelism
ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
}
return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
};
+
+ // kafka spout number of tasks
ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
}
return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
};
+
+ // parser bolt parallelism
ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
@@ -345,6 +351,7 @@ public class ParserTopologyCLI {
return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
};
+ // parser bolt number of tasks
ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
@@ -352,6 +359,7 @@ public class ParserTopologyCLI {
return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
};
+ // error bolt parallelism
ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
@@ -359,6 +367,7 @@ public class ParserTopologyCLI {
return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
};
+ // error bolt number of tasks
ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
@@ -366,6 +375,7 @@ public class ParserTopologyCLI {
return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
};
+ // kafka spout config
ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
@@ -373,6 +383,7 @@ public class ParserTopologyCLI {
return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>());
};
+ // security protocol
ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
Optional<String> sp = Optional.empty();
if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
@@ -384,6 +395,7 @@ public class ParserTopologyCLI {
return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
};
+ // storm configuration
ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
Map<String, Object> c = parserConfig.getStormConfig();
Config finalConfig = new Config();
@@ -399,39 +411,84 @@ public class ParserTopologyCLI {
return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
};
- Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
+ // output topic
+ ValueSupplier<String> outputTopic = (parserConfig, clazz) -> {
+ String topic;
+
+ if(ParserOptions.OUTPUT_TOPIC.has(cmd)) {
+ topic = ParserOptions.OUTPUT_TOPIC.get(cmd);
+
+ } else if(parserConfig.getOutputTopic() != null) {
+ topic = parserConfig.getOutputTopic();
+
+ } else {
+ topic = Constants.ENRICHMENT_TOPIC;
+ }
+
+ return topic;
+ };
+
+ // error topic
+ ValueSupplier<String> errorTopic = (parserConfig, clazz) -> {
+ String topic;
+
+ if(parserConfig.getErrorTopic() != null) {
+ topic = parserConfig.getErrorTopic();
+
+ } else {
+ // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
+ topic = null;
+ }
+
+ return topic;
+ };
- return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic);
+ return getParserTopology(
+ zookeeperUrl,
+ brokerUrl,
+ sensorType,
+ spoutParallelism,
+ spoutNumTasks,
+ parserParallelism,
+ parserNumTasks,
+ errorParallelism,
+ errorNumTasks,
+ spoutConfig,
+ securityProtocol,
+ stormConf,
+ outputTopic,
+ errorTopic);
}
- protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl
- , Optional<String> brokerUrl
- , String sensorType
- , ValueSupplier<Integer> spoutParallelism
- , ValueSupplier<Integer> spoutNumTasks
- , ValueSupplier<Integer> parserParallelism
- , ValueSupplier<Integer> parserNumTasks
- , ValueSupplier<Integer> errorParallelism
- , ValueSupplier<Integer> errorNumTasks
- , ValueSupplier<Map> spoutConfig
- , ValueSupplier<String> securityProtocol
- , ValueSupplier<Config> stormConf
- , Optional<String> outputTopic
- ) throws Exception
- {
- return ParserTopologyBuilder.build(zookeeperUrl,
- brokerUrl,
- sensorType,
- spoutParallelism,
- spoutNumTasks,
- parserParallelism,
- parserNumTasks,
- errorParallelism,
- errorNumTasks,
- spoutConfig,
- securityProtocol,
- outputTopic,
- stormConf
+ protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl,
+ Optional<String> brokerUrl,
+ String sensorType,
+ ValueSupplier<Integer> spoutParallelism,
+ ValueSupplier<Integer> spoutNumTasks,
+ ValueSupplier<Integer> parserParallelism,
+ ValueSupplier<Integer> parserNumTasks,
+ ValueSupplier<Integer> errorParallelism,
+ ValueSupplier<Integer> errorNumTasks,
+ ValueSupplier<Map> spoutConfig,
+ ValueSupplier<String> securityProtocol,
+ ValueSupplier<Config> stormConf,
+ ValueSupplier<String> outputTopic,
+ ValueSupplier<String> errorTopic) throws Exception {
+ return ParserTopologyBuilder.build(
+ zookeeperUrl,
+ brokerUrl,
+ sensorType,
+ spoutParallelism,
+ spoutNumTasks,
+ parserParallelism,
+ parserNumTasks,
+ errorParallelism,
+ errorNumTasks,
+ spoutConfig,
+ securityProtocol,
+ outputTopic,
+ errorTopic,
+ stormConf
);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 63d9e52..7f40684 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,14 +17,6 @@
*/
package org.apache.metron.parsers.integration.components;
-import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
-import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.ZKServerComponent;
@@ -32,24 +24,37 @@ import org.apache.metron.parsers.topology.ParserTopologyBuilder;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.KillOptions;
-import org.apache.storm.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
public class ParserTopologyComponent implements InMemoryComponent {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private Properties topologyProperties;
private String brokerUrl;
private String sensorType;
private LocalCluster stormCluster;
private String outputTopic;
+ private String errorTopic;
public static class Builder {
+
Properties topologyProperties;
String brokerUrl;
String sensorType;
String outputTopic;
+ String errorTopic;
+
public Builder withTopologyProperties(Properties topologyProperties) {
this.topologyProperties = topologyProperties;
return this;
@@ -68,16 +73,31 @@ public class ParserTopologyComponent implements InMemoryComponent {
return this;
}
+ public Builder withErrorTopic(String topic) {
+ this.errorTopic = topic;
+ return this;
+ }
+
public ParserTopologyComponent build() {
- return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic);
+
+ if(sensorType == null) {
+ throw new IllegalArgumentException("The sensor type must be defined.");
+ }
+
+ if(outputTopic == null) {
+ throw new IllegalArgumentException("The output topic must be defined.");
+ }
+
+ return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic);
}
}
- public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) {
+ public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) {
this.topologyProperties = topologyProperties;
this.brokerUrl = brokerUrl;
this.sensorType = sensorType;
this.outputTopic = outputTopic;
+ this.errorTopic = errorTopic;
}
public void updateSensorType(String sensorType) {
@@ -89,24 +109,26 @@ public class ParserTopologyComponent implements InMemoryComponent {
try {
final Map<String, Object> stormConf = new HashMap<>();
stormConf.put(Config.TOPOLOGY_DEBUG, true);
- ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
- , Optional.ofNullable(brokerUrl)
- , sensorType
- , (x,y) -> 1
- , (x,y) -> 1
- , (x,y) -> 1
- , (x,y) -> 1
- , (x,y) -> 1
- , (x,y) -> 1
- , (x,y) -> new HashMap<>()
- , (x,y) -> null
- , Optional.ofNullable(outputTopic)
- , (x,y) -> {
- Config c = new Config();
- c.putAll(stormConf);
- return c;
- }
- );
+ ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
+ topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
+ Optional.ofNullable(brokerUrl),
+ sensorType,
+ (x,y) -> 1,
+ (x,y) -> 1,
+ (x,y) -> 1,
+ (x,y) -> 1,
+ (x,y) -> 1,
+ (x,y) -> 1,
+ (x,y) -> new HashMap<>(),
+ (x,y) -> null,
+ (x,y) -> outputTopic,
+ (x,y) -> errorTopic,
+ (x,y) -> {
+ Config c = new Config();
+ c.putAll(stormConf);
+ return c;
+ }
+ );
stormCluster = new LocalCluster();
stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 97dac5a..fcfc93b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -217,17 +217,21 @@ public class ParserTopologyCLITest {
private Map<String, Object> spoutConfig;
private String securityProtocol;
private Config stormConf;
-
- public ParserInput( ValueSupplier<Integer> spoutParallelism
- , ValueSupplier<Integer> spoutNumTasks
- , ValueSupplier<Integer> parserParallelism
- , ValueSupplier<Integer> parserNumTasks
- , ValueSupplier<Integer> errorParallelism
- , ValueSupplier<Integer> errorNumTasks
- , ValueSupplier<Map> spoutConfig
- , ValueSupplier<String> securityProtocol
- , ValueSupplier<Config> stormConf
- , SensorParserConfig config
+ private String outputTopic;
+ private String errorTopic;
+
+ public ParserInput(ValueSupplier<Integer> spoutParallelism,
+ ValueSupplier<Integer> spoutNumTasks,
+ ValueSupplier<Integer> parserParallelism,
+ ValueSupplier<Integer> parserNumTasks,
+ ValueSupplier<Integer> errorParallelism,
+ ValueSupplier<Integer> errorNumTasks,
+ ValueSupplier<Map> spoutConfig,
+ ValueSupplier<String> securityProtocol,
+ ValueSupplier<Config> stormConf,
+ ValueSupplier<String> outputTopic,
+ ValueSupplier<String> errorTopic,
+ SensorParserConfig config
)
{
this.spoutParallelism = spoutParallelism.get(config, Integer.class);
@@ -239,6 +243,8 @@ public class ParserTopologyCLITest {
this.spoutConfig = spoutConfig.get(config, Map.class);
this.securityProtocol = securityProtocol.get(config, String.class);
this.stormConf = stormConf.get(config, Config.class);
+ this.outputTopic = outputTopic.get(config, String.class);
+ this.errorTopic = outputTopic.get(config, String.class);
}
public Integer getSpoutParallelism() {
@@ -276,30 +282,43 @@ public class ParserTopologyCLITest {
public Config getStormConf() {
return stormConf;
}
+
+ public String getOutputTopic() {
+ return outputTopic;
+ }
+
+ public String getErrorTopic() {
+ return errorTopic;
+ }
}
+
/**
-{
- "parserClassName": "org.apache.metron.parsers.GrokParser",
- "sensorTopic": "squid",
- "parserConfig": {
- "grokPath": "/patterns/squid",
- "patternLabel": "SQUID_DELIMITED",
- "timestampField": "timestamp"
- },
- "fieldTransformations" : [
- {
- "transformation" : "STELLAR"
- ,"output" : [ "full_hostname", "domain_without_subdomains" ]
- ,"config" : {
- "full_hostname" : "URL_TO_HOST(url)"
- ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
- }
- }
- ]
-}
+ * {
+ * "parserClassName": "org.apache.metron.parsers.GrokParser",
+ * "sensorTopic": "squid",
+ * "parserConfig": {
+ * "grokPath": "/patterns/squid",
+ * "patternLabel": "SQUID_DELIMITED",
+ * "timestampField": "timestamp"
+ * },
+ * "fieldTransformations" : [
+ * {
+ * "transformation" : "STELLAR",
+ * "output" : [
+ * "full_hostname",
+ * "domain_without_subdomains"
+ * ],
+ * "config" : {
+ * "full_hostname" : "URL_TO_HOST(url)",
+ * "domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
+ * }
+ * }
+ * ]
+ * }
*/
@Multiline
public static String baseConfig;
+
private static SensorParserConfig getBaseConfig() {
try {
return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class);
@@ -600,18 +619,37 @@ public class ParserTopologyCLITest {
final ParserInput[] parserInput = new ParserInput[]{null};
new ParserTopologyCLI() {
@Override
- protected ParserTopologyBuilder.ParserTopology getParserTopology(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, Optional<String> outputTopic) throws Exception {
- parserInput[0] = new ParserInput( spoutParallelism
- , spoutNumTasks
- , parserParallelism
- , parserNumTasks
- , errorParallelism
- , errorNumTasks
- , spoutConfig
- , securityProtocol
- , stormConf
- , config
- );
+ protected ParserTopologyBuilder.ParserTopology getParserTopology(
+ String zookeeperUrl,
+ Optional<String> brokerUrl,
+ String sensorType,
+ ValueSupplier<Integer> spoutParallelism,
+ ValueSupplier<Integer> spoutNumTasks,
+ ValueSupplier<Integer> parserParallelism,
+ ValueSupplier<Integer> parserNumTasks,
+ ValueSupplier<Integer> errorParallelism,
+ ValueSupplier<Integer> errorNumTasks,
+ ValueSupplier<Map> spoutConfig,
+ ValueSupplier<String> securityProtocol,
+ ValueSupplier<Config> stormConf,
+ ValueSupplier<String> outputTopic,
+ ValueSupplier<String> errorTopic) throws Exception {
+
+ parserInput[0] = new ParserInput(
+ spoutParallelism,
+ spoutNumTasks,
+ parserParallelism,
+ parserNumTasks,
+ errorParallelism,
+ errorNumTasks,
+ spoutConfig,
+ securityProtocol,
+ stormConf,
+ outputTopic,
+ errorTopic,
+ config
+ );
+
return null;
}
}.createParserTopology(cmd);
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 4f513be..49d7521 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -30,9 +30,14 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
import org.apache.metron.enrichment.converter.EnrichmentValue;
import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.integration.*;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
@@ -40,41 +45,52 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationTest {
/**
- {
- "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
- ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter"
- ,"sensorTopic":"dummy"
- ,"parserConfig":
- {
- "shew.table" : "dummy"
- ,"shew.cf" : "cf"
- ,"shew.keyColumns" : "col2"
- ,"shew.enrichmentType" : "et"
- ,"shew.hbaseProvider" : "org.apache.metron.hbase.mock.MockHBaseTableProvider"
- ,"columns" : {
- "col1" : 0
- ,"col2" : 1
- ,"col3" : 2
- }
- }
- }
+ * {
+ * "parserClassName": "org.apache.metron.parsers.csv.CSVParser",
+ * "writerClassName": "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter",
+ * "sensorTopic": "dummy",
+ * "outputTopic": "output",
+ * "errorTopic": "error",
+ * "parserConfig": {
+ * "shew.table": "dummy",
+ * "shew.cf": "cf",
+ * "shew.keyColumns": "col2",
+ * "shew.enrichmentType": "et",
+ * "shew.hbaseProvider": "org.apache.metron.hbase.mock.MockHBaseTableProvider",
+ * "columns" : {
+ * "col1": 0,
+ * "col2": 1,
+ * "col3": 2
+ * }
+ * }
+ * }
*/
@Multiline
- public static String parserConfig;
+ public static String parserConfigJSON;
@Test
public void test() throws UnableToStartException, IOException {
final String sensorType = "dummy";
+
+ // the input messages to parse
final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
add(Bytes.toBytes("col11,col12,col13"));
add(Bytes.toBytes("col21,col22,col23"));
add(Bytes.toBytes("col31,col32,col33"));
}};
+
+ // setup external components; kafka, zookeeper
MockHBaseTableProvider.addToCache(sensorType, "cf");
final Properties topologyProperties = new Properties();
final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
@@ -83,17 +99,20 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
}});
topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+ SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
+
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
.withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
- .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+ .withParserSensorConfig(sensorType, parserConfig);
ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
.withSensorType(sensorType)
.withTopologyProperties(topologyProperties)
- .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+ .withBrokerUrl(kafkaComponent.getBrokerList())
+ .withOutputTopic(parserConfig.getOutputTopic())
+ .build();
- //UnitTestHelper.verboseLogging();
ComponentRunner runner = new ComponentRunner.Builder()
.withComponent("zk", zkServerComponent)
.withComponent("kafka", kafkaComponent)
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index 0cfaae3..cde08bc 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -45,7 +45,6 @@ import java.io.IOException;
import java.util.*;
public class WriterBoltIntegrationTest extends BaseIntegrationTest {
- private static final String ERROR_TOPIC = "parser_error";
public static class MockValidator implements FieldValidation{
@@ -62,48 +61,53 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
}
}
/**
- {
- "fieldValidations" : [
- {
- "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"
- }
- ],
- "parser.error.topic":"parser_error"
- }
- */
+ * {
+ * "fieldValidations" : [
+ * { "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator" }
+ * ]
+ * }
+ */
@Multiline
public static String globalConfig;
/**
- {
- "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
- ,"sensorTopic":"dummy"
- ,"parserConfig":
- {
- "columns" : {
- "action" : 0
- ,"dummy" : 1
- }
- }
- }
+ * {
+ * "parserClassName" : "org.apache.metron.parsers.csv.CSVParser",
+ * "sensorTopic": "dummy",
+ * "outputTopic": "output",
+ * "errorTopic": "parser_error",
+ * "parserConfig": {
+ * "columns" : {
+ * "action" : 0,
+ * "dummy" : 1
+ * }
+ * }
+ * }
*/
@Multiline
- public static String parserConfig;
+ public static String parserConfigJSON;
@Test
public void test() throws UnableToStartException, IOException, ParseException {
+
UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL);
final String sensorType = "dummy";
+
+ SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
+
+ // the input messages to parser
final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
add(Bytes.toBytes("valid,foo"));
add(Bytes.toBytes("invalid,foo"));
add(Bytes.toBytes("error"));
}};
+
+ // setup external components; zookeeper, kafka
final Properties topologyProperties = new Properties();
final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
add(new KafkaComponent.Topic(sensorType, 1));
- add(new KafkaComponent.Topic(ERROR_TOPIC, 1));
+ add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
}});
topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
@@ -111,14 +115,16 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
.withGlobalConfig(globalConfig)
- .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+ .withParserSensorConfig(sensorType, parserConfig);
ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
.withSensorType(sensorType)
.withTopologyProperties(topologyProperties)
- .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+ .withBrokerUrl(kafkaComponent.getBrokerList())
+ .withErrorTopic(parserConfig.getErrorTopic())
+ .withOutputTopic(parserConfig.getOutputTopic())
+ .build();
- //UnitTestHelper.verboseLogging();
ComponentRunner runner = new ComponentRunner.Builder()
.withComponent("zk", zkServerComponent)
.withComponent("kafka", kafkaComponent)
@@ -131,48 +137,42 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
try {
runner.start();
kafkaComponent.writeMessages(sensorType, inputMessages);
- ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor());
+ ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(
+ getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic()));
+
+ // validate the output messages
Map<String,List<JSONObject>> outputMessages = result.getResult();
Assert.assertEquals(2, outputMessages.size());
Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action"));
- Assert.assertEquals(2, outputMessages.get(ERROR_TOPIC).size());
- JSONObject invalidMessage = outputMessages.get(ERROR_TOPIC).get(0);
+ Assert.assertEquals(2, outputMessages.get(parserConfig.getErrorTopic()).size());
+
+ // validate an error message
+ JSONObject invalidMessage = outputMessages.get(parserConfig.getErrorTopic()).get(0);
Assert.assertEquals(Constants.ErrorType.PARSER_INVALID.getType(), invalidMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
JSONObject rawMessage = JSONUtils.INSTANCE.load((String) invalidMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()), JSONObject.class);
Assert.assertEquals("foo", rawMessage.get("dummy"));
Assert.assertEquals("invalid", rawMessage.get("action"));
- JSONObject errorMessage = outputMessages.get(ERROR_TOPIC).get(1);
+
+ // validate the next error message
+ JSONObject errorMessage = outputMessages.get(parserConfig.getErrorTopic()).get(1);
Assert.assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
Assert.assertEquals("error", errorMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
- // It's unclear if we need a rawMessageBytes field so commenting out for now
- //Assert.assertTrue(Arrays.equals(listToBytes(errorMessage.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName())), "error".getBytes()));
- }
- finally {
+
+ } finally {
if(runner != null) {
runner.stop();
}
}
}
- private static byte[] listToBytes(Object o ){
- List<Byte> l = (List<Byte>)o;
- byte[] ret = new byte[l.size()];
- int i = 0;
- for(Number b : l) {
- ret[i++] = b.byteValue();
- }
- return ret;
- }
+
private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
List<JSONObject> tmp = new ArrayList<>();
- Iterables.addAll(tmp
- , Iterables.transform(outputMessages
- , message -> {
+ Iterables.addAll(tmp,
+ Iterables.transform(outputMessages,
+ message -> {
try {
- return new JSONObject(JSONUtils.INSTANCE.load(new String(message)
- ,JSONUtils.MAP_SUPPLIER
- )
- );
+ return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
} catch (Exception ex) {
throw new IllegalStateException(ex);
}
@@ -181,13 +181,14 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
);
return tmp;
}
+
@SuppressWarnings("unchecked")
- private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(){
+ private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){
return new KafkaProcessor<>()
.withKafkaComponentName("kafka")
- .withReadTopic(Constants.ENRICHMENT_TOPIC)
- .withErrorTopic(ERROR_TOPIC)
+ .withReadTopic(outputTopic)
+ .withErrorTopic(errorTopic)
.withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
@Nullable
@Override
@@ -201,7 +202,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
return new HashMap<String, List<JSONObject>>() {{
put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
- put(ERROR_TOPIC, loadMessages(messageSet.getErrors()));
+ put(errorTopic, loadMessages(messageSet.getErrors()));
}};
}
});
http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index f73e0f4..c4e3998 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -76,6 +76,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
this.brokerUrl = brokerUrl;
}
+ public KafkaWriter withBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ return this;
+ }
+
public KafkaWriter withZkQuorum(String zkQuorum) {
this.zkQuorum = zkQuorum;
return this;