You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/04/27 19:30:19 UTC
[39/50] [abbrv] metron git commit: METRON-1499 Enable Configuration
of Unified Enrichment Topology via Ambari (nickwallen) closes
apache/metron#984
http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
deleted file mode 100644
index fd7ceff..0000000
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ /dev/null
@@ -1,590 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "enrichment"
-config:
- topology.workers: ${enrichment.workers}
- topology.acker.executors: ${enrichment.acker.executors}
- topology.worker.childopts: ${topology.worker.childopts}
- topology.auto-credentials: ${topology.auto-credentials}
- topology.max.spout.pending: ${topology.max.spout.pending}
-
-components:
-
-# Enrichment
- - id: "stellarEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
- configMethods:
- - name: "ofType"
- args:
- - "ENRICHMENT"
-
- # Any kafka props for the producer go here.
- - id: "kafkaWriterProps"
- className: "java.util.HashMap"
- configMethods:
- - name: "put"
- args:
- - "security.protocol"
- - "${kafka.security.protocol}"
-
- - id: "stellarEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "stellar"
- - ref: "stellarEnrichmentAdapter"
-
- - id: "geoEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
- - id: "geoEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "geo"
- - ref: "geoEnrichmentAdapter"
- - id: "hostEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
- constructorArgs:
- - '${enrichment.host.known_hosts}'
- - id: "hostEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "host"
- - ref: "hostEnrichmentAdapter"
-
- - id: "simpleHBaseEnrichmentConfig"
- className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
- configMethods:
- - name: "withProviderImpl"
- args:
- - "${hbase.provider.impl}"
- - name: "withHBaseTable"
- args:
- - "${enrichment.simple.hbase.table}"
- - name: "withHBaseCF"
- args:
- - "${enrichment.simple.hbase.cf}"
- - id: "simpleHBaseEnrichmentAdapter"
- className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
- configMethods:
- - name: "withConfig"
- args:
- - ref: "simpleHBaseEnrichmentConfig"
- - id: "simpleHBaseEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "hbaseEnrichment"
- - ref: "simpleHBaseEnrichmentAdapter"
- - id: "enrichments"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - ref: "geoEnrichment"
- - name: "add"
- args:
- - ref: "hostEnrichment"
- - name: "add"
- args:
- - ref: "simpleHBaseEnrichment"
- - name: "add"
- args:
- - ref: "stellarEnrichment"
-
- #enrichment error
- - id: "enrichmentErrorKafkaWriter"
- className: "org.apache.metron.writer.kafka.KafkaWriter"
- configMethods:
- - name: "withTopic"
- args:
- - "${enrichment.error.topic}"
- - name: "withZkQuorum"
- args:
- - "${kafka.zk}"
- - name: "withProducerConfigs"
- args:
- - ref: "kafkaWriterProps"
-
-# Threat Intel
- - id: "stellarThreatIntelAdapter"
- className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
- configMethods:
- - name: "ofType"
- args:
- - "THREAT_INTEL"
- - id: "stellarThreatIntelEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "stellar"
- - ref: "stellarThreatIntelAdapter"
- - id: "simpleHBaseThreatIntelConfig"
- className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
- configMethods:
- - name: "withProviderImpl"
- args:
- - "${hbase.provider.impl}"
- - name: "withTrackerHBaseTable"
- args:
- - "${threat.intel.tracker.table}"
- - name: "withTrackerHBaseCF"
- args:
- - "${threat.intel.tracker.cf}"
- - name: "withHBaseTable"
- args:
- - "${threat.intel.simple.hbase.table}"
- - name: "withHBaseCF"
- args:
- - "${threat.intel.simple.hbase.cf}"
- - id: "simpleHBaseThreatIntelAdapter"
- className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
- configMethods:
- - name: "withConfig"
- args:
- - ref: "simpleHBaseThreatIntelConfig"
- - id: "simpleHBaseThreatIntelEnrichment"
- className: "org.apache.metron.enrichment.configuration.Enrichment"
- constructorArgs:
- - "hbaseThreatIntel"
- - ref: "simpleHBaseThreatIntelAdapter"
-
- - id: "threatIntels"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - ref: "simpleHBaseThreatIntelEnrichment"
- - name: "add"
- args:
- - ref: "stellarThreatIntelEnrichment"
-
- #threatintel error
- - id: "threatIntelErrorKafkaWriter"
- className: "org.apache.metron.writer.kafka.KafkaWriter"
- configMethods:
- - name: "withTopic"
- args:
- - "${threat.intel.error.topic}"
- - name: "withZkQuorum"
- args:
- - "${kafka.zk}"
- - name: "withProducerConfigs"
- args:
- - ref: "kafkaWriterProps"
-#indexing
- - id: "kafkaWriter"
- className: "org.apache.metron.writer.kafka.KafkaWriter"
- configMethods:
- - name: "withTopic"
- args:
- - "${enrichment.output.topic}"
- - name: "withZkQuorum"
- args:
- - "${kafka.zk}"
- - name: "withProducerConfigs"
- args:
- - ref: "kafkaWriterProps"
-
-#kafka/zookeeper
- # Any kafka props for the consumer go here.
- - id: "kafkaProps"
- className: "java.util.HashMap"
- configMethods:
- - name: "put"
- args:
- - "value.deserializer"
- - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- - name: "put"
- args:
- - "key.deserializer"
- - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
- - name: "put"
- args:
- - "group.id"
- - "enrichments"
- - name: "put"
- args:
- - "security.protocol"
- - "${kafka.security.protocol}"
-
-
- # The fields to pull out of the kafka messages
- - id: "fields"
- className: "java.util.ArrayList"
- configMethods:
- - name: "add"
- args:
- - "value"
-
- - id: "kafkaConfig"
- className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
- constructorArgs:
- - ref: "kafkaProps"
- # topic name
- - "${enrichment.input.topic}"
- - "${kafka.zk}"
- - ref: "fields"
- configMethods:
- - name: "setFirstPollOffsetStrategy"
- args:
- - "${kafka.start}"
-
-
-spouts:
- - id: "kafkaSpout"
- className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
- constructorArgs:
- - ref: "kafkaConfig"
- parallelism: ${kafka.spout.parallelism}
-
-bolts:
-# Enrichment Bolts
- - id: "enrichmentSplitBolt"
- className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichments"
- args:
- - ref: "enrichments"
- parallelism: ${enrichment.split.parallelism}
-
- - id: "geoEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "geoEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
- - id: "stellarEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "stellarEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${enrichment.stellar.parallelism}
-
- - id: "hostEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "hostEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
- - id: "simpleHBaseEnrichmentBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "simpleHBaseEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
-
- - id: "enrichmentJoinBolt"
- className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMaxCacheSize"
- args: [${enrichment.join.cache.size}]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${enrichment.join.parallelism}
-
- - id: "enrichmentErrorOutputBolt"
- className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMessageWriter"
- args:
- - ref: "enrichmentErrorKafkaWriter"
-
-
-# Threat Intel Bolts
- - id: "threatIntelSplitBolt"
- className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichments"
- args:
- - ref: "threatIntels"
- - name: "withMessageFieldName"
- args: ["message"]
- parallelism: ${threat.intel.split.parallelism}
-
- - id: "simpleHBaseThreatIntelBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "simpleHBaseThreatIntelEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- - id: "stellarThreatIntelBolt"
- className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withEnrichment"
- args:
- - ref: "stellarThreatIntelEnrichment"
- - name: "withMaxCacheSize"
- args: [10000]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${threat.intel.stellar.parallelism}
-
- - id: "threatIntelJoinBolt"
- className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMaxCacheSize"
- args: [${threat.intel.join.cache.size}]
- - name: "withMaxTimeRetain"
- args: [10]
- parallelism: ${threat.intel.join.parallelism}
-
- - id: "threatIntelErrorOutputBolt"
- className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMessageWriter"
- args:
- - ref: "threatIntelErrorKafkaWriter"
-
-# Indexing Bolts
- - id: "outputBolt"
- className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
- constructorArgs:
- - "${kafka.zk}"
- configMethods:
- - name: "withMessageWriter"
- args:
- - ref: "kafkaWriter"
- parallelism: ${kafka.writer.parallelism}
-
-
-streams:
-#parser
- - name: "spout -> enrichmentSplit"
- from: "kafkaSpout"
- to: "enrichmentSplitBolt"
- grouping:
- type: LOCAL_OR_SHUFFLE
-
-#enrichment
- - name: "enrichmentSplit -> host"
- from: "enrichmentSplitBolt"
- to: "hostEnrichmentBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["message"]
-
- - name: "enrichmentSplit -> geo"
- from: "enrichmentSplitBolt"
- to: "geoEnrichmentBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["message"]
-
- - name: "enrichmentSplit -> stellar"
- from: "enrichmentSplitBolt"
- to: "stellarEnrichmentBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["message"]
-
-
- - name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
- from: "enrichmentSplitBolt"
- to: "simpleHBaseEnrichmentBolt"
- grouping:
- streamId: "hbaseEnrichment"
- type: FIELDS
- args: ["message"]
-
- - name: "splitter -> join"
- from: "enrichmentSplitBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
-
- - name: "geo -> join"
- from: "geoEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "geo"
- type: FIELDS
- args: ["key"]
-
- - name: "stellar -> join"
- from: "stellarEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["key"]
-
- - name: "simpleHBaseEnrichmentBolt -> join"
- from: "simpleHBaseEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "hbaseEnrichment"
- type: FIELDS
- args: ["key"]
-
- - name: "host -> join"
- from: "hostEnrichmentBolt"
- to: "enrichmentJoinBolt"
- grouping:
- streamId: "host"
- type: FIELDS
- args: ["key"]
-
- # Error output
- - name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "geoEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "stellarEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "hostEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt"
- from: "simpleHBaseEnrichmentBolt"
- to: "enrichmentErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
-#threat intel
- - name: "enrichmentJoin -> threatSplit"
- from: "enrichmentJoinBolt"
- to: "threatIntelSplitBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
-
- - name: "threatSplit -> simpleHBaseThreatIntel"
- from: "threatIntelSplitBolt"
- to: "simpleHBaseThreatIntelBolt"
- grouping:
- streamId: "hbaseThreatIntel"
- type: FIELDS
- args: ["message"]
-
- - name: "threatSplit -> stellarThreatIntel"
- from: "threatIntelSplitBolt"
- to: "stellarThreatIntelBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["message"]
-
-
- - name: "simpleHBaseThreatIntel -> join"
- from: "simpleHBaseThreatIntelBolt"
- to: "threatIntelJoinBolt"
- grouping:
- streamId: "hbaseThreatIntel"
- type: FIELDS
- args: ["key"]
-
- - name: "stellarThreatIntel -> join"
- from: "stellarThreatIntelBolt"
- to: "threatIntelJoinBolt"
- grouping:
- streamId: "stellar"
- type: FIELDS
- args: ["key"]
-
- - name: "threatIntelSplit -> threatIntelJoin"
- from: "threatIntelSplitBolt"
- to: "threatIntelJoinBolt"
- grouping:
- streamId: "message"
- type: FIELDS
- args: ["key"]
-#output
- - name: "threatIntelJoin -> output"
- from: "threatIntelJoinBolt"
- to: "outputBolt"
- grouping:
- streamId: "message"
- type: LOCAL_OR_SHUFFLE
-
- # Error output
- - name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt"
- from: "simpleHBaseThreatIntelBolt"
- to: "threatIntelErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
- - name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt"
- from: "stellarThreatIntelBolt"
- to: "threatIntelErrorOutputBolt"
- grouping:
- streamId: "error"
- type: LOCAL_OR_SHUFFLE
-
http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
index 6824b87..77c3a77 100755
--- a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
+++ b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
@@ -1,5 +1,5 @@
#!/bin/bash
-#
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,4 +19,12 @@
METRON_VERSION=${project.version}
METRON_HOME=/usr/metron/$METRON_VERSION
TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/enrichment.properties
+
+# there are two enrichment topologies. by default, the split-join enrichment topology is executed
+SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml --filter $METRON_HOME/config/enrichment-splitjoin.properties"
+UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml --filter $METRON_HOME/config/enrichment-unified.properties"
+
+# by passing in different args, the user can execute an alternative enrichment topology
+ARGS=${@:-$SPLIT_JOIN_ARGS}
+
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS
http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 3c55c95..2e22eab 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -65,16 +65,19 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+/**
+ * Integration test for the 'Split-Join' enrichment topology.
+ */
public class EnrichmentIntegrationTest extends BaseIntegrationTest {
- private static final String ERROR_TOPIC = "enrichment_error";
- private static final String SRC_IP = "ip_src_addr";
- private static final String DST_IP = "ip_dst_addr";
- private static final String MALICIOUS_IP_TYPE = "malicious_ip";
- private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
- private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
+
+ public static final String ERROR_TOPIC = "enrichment_error";
+ public static final String SRC_IP = "ip_src_addr";
+ public static final String DST_IP = "ip_dst_addr";
+ public static final String MALICIOUS_IP_TYPE = "malicious_ip";
+ public static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
+ public static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{
put("orientation", "north");
}};
-
public static final String DEFAULT_COUNTRY = "test country";
public static final String DEFAULT_CITY = "test city";
public static final String DEFAULT_POSTAL_CODE = "test postalCode";
@@ -82,15 +85,18 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
public static final String DEFAULT_LONGITUDE = "test longitude";
public static final String DEFAULT_DMACODE= "test dmaCode";
public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
+ public static final String cf = "cf";
+ public static final String trackerHBaseTableName = "tracker";
+ public static final String threatIntelTableName = "threat_intel";
+ public static final String enrichmentsTableName = "enrichments";
- protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2";
protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
private static File geoHdfsFile;
protected String fluxPath() {
- return "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
+ return "../metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml";
}
private static List<byte[]> getInputMessages(String path){
@@ -115,13 +121,22 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
}
- @Test
- public void test() throws Exception {
- final String cf = "cf";
- final String trackerHBaseTableName = "tracker";
- final String threatIntelTableName = "threat_intel";
- final String enrichmentsTableName = "enrichments";
- final Properties topologyProperties = new Properties() {{
+ /**
+ * Returns the path to the topology properties template.
+ *
+ * @return The path to the topology properties template.
+ */
+ public String getTemplatePath() {
+ return "../metron-enrichment/src/main/config/enrichment-splitjoin.properties.j2";
+ }
+
+ /**
+ * Properties for the 'Split-Join' topology.
+ *
+ * @return The topology properties.
+ */
+ public Properties getTopologyProperties() {
+ return new Properties() {{
setProperty("enrichment_workers", "1");
setProperty("enrichment_acker_executors", "0");
setProperty("enrichment_topology_worker_childopts", "");
@@ -142,11 +157,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
"{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
-
setProperty("threatintel_hbase_table", threatIntelTableName);
setProperty("threatintel_hbase_cf", cf);
-
-
setProperty("enrichment_kafka_spout_parallelism", "1");
setProperty("enrichment_split_parallelism", "1");
setProperty("enrichment_stellar_parallelism", "1");
@@ -155,8 +167,13 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
setProperty("threat_intel_stellar_parallelism", "1");
setProperty("threat_intel_join_parallelism", "1");
setProperty("kafka_writer_parallelism", "1");
-
}};
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ final Properties topologyProperties = getTopologyProperties();
final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
@@ -196,7 +213,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(fluxPath()))
.withTopologyName("test")
- .withTemplateLocation(new File(templatePath))
+ .withTemplateLocation(new File(getTemplatePath()))
.withTopologyProperties(topologyProperties)
.build();
@@ -531,7 +548,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
, message -> {
try {
return new HashMap<>(JSONUtils.INSTANCE.load(new String(message)
- , JSONUtils.MAP_SUPPLIER
+ , JSONUtils.MAP_SUPPLIER
)
);
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
index 1f06733..5c19b39 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
@@ -17,7 +17,78 @@
*/
package org.apache.metron.enrichment.integration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+
+import java.util.Properties;
+
+/**
+ * Integration test for the 'Unified' enrichment topology.
+ */
public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+
+ /**
+ * Returns the path to the topology properties template.
+ *
+ * @return The path to the topology properties template.
+ */
+ public String getTemplatePath() {
+ return "../metron-enrichment/src/main/config/enrichment-unified.properties.j2";
+ }
+
+ /**
+ * Properties for the 'Unified' topology.
+ *
+ * @return The topology properties.
+ */
+ @Override
+ public Properties getTopologyProperties() {
+ return new Properties() {{
+
+ // storm
+ setProperty("enrichment_workers", "1");
+ setProperty("enrichment_acker_executors", "0");
+ setProperty("enrichment_topology_worker_childopts", "");
+ setProperty("topology_auto_credentials", "[]");
+ setProperty("enrichment_topology_max_spout_pending", "500");
+
+ // kafka - zookeeper_quorum, kafka_brokers set elsewhere
+ setProperty("kafka_security_protocol", "PLAINTEXT");
+ setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
+ setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
+ setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
+ setProperty("enrichment_error_topic", ERROR_TOPIC);
+ setProperty("threatintel_error_topic", ERROR_TOPIC);
+
+ // enrichment
+ setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName());
+ setProperty("enrichment_hbase_table", enrichmentsTableName);
+ setProperty("enrichment_hbase_cf", cf);
+ setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
+ "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
+ "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
+ "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]");
+
+ // threat intel
+ setProperty("threatintel_hbase_table", threatIntelTableName);
+ setProperty("threatintel_hbase_cf", cf);
+
+ // parallelism
+ setProperty("unified_kafka_spout_parallelism", "1");
+ setProperty("unified_enrichment_parallelism", "1");
+ setProperty("unified_threat_intel_parallelism", "1");
+ setProperty("unified_kafka_writer_parallelism", "1");
+
+ // caches
+ setProperty("unified_enrichment_cache_size", "1000");
+ setProperty("unified_threat_intel_cache_size", "1000");
+
+ // threads
+ setProperty("unified_enrichment_threadpool_size", "1");
+ setProperty("unified_enrichment_threadpool_type", "FIXED");
+ }};
+ }
+
@Override
public String fluxPath() {
return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml";