You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/20 15:40:39 UTC
[11/15] metron git commit: METRON-1657 Parser aggregation in storm
(justinleet) closes apache/metron#1099
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 15ce735..06f4cec 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -58,6 +58,7 @@ import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
import org.apache.metron.parsers.BasicParser;
import org.apache.metron.parsers.interfaces.MessageFilter;
import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.topology.ParserComponents;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.metron.test.error.MetronErrorJSONMatcher;
@@ -185,7 +186,15 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testEmpty() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ null,
+ new WriterHandler(writer)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return ParserBoltTest.createUpdater();
@@ -209,7 +218,7 @@ public class ParserBoltTest extends BaseBoltTest {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_ERROR)
.withThrowable(new NullPointerException())
- .withSensorType(sensorType)
+ .withSensorType(Collections.singleton(sensorType))
.addRawMessage(sampleBinary);
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
}
@@ -217,7 +226,15 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testInvalid() throws Exception {
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ null,
+ new WriterHandler(writer)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return ParserBoltTest.createUpdater();
@@ -243,7 +260,7 @@ public class ParserBoltTest extends BaseBoltTest {
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.PARSER_INVALID)
- .withSensorType(sensorType)
+ .withSensorType(Collections.singleton(sensorType))
.withErrorFields(new HashSet<String>() {{ add("field"); }})
.addRawMessage(new JSONObject(){{
put("field", "invalidValue");
@@ -255,14 +272,20 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void test() throws Exception {
-
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ null,
+ new WriterHandler(writer)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return ParserBoltTest.createUpdater();
}
-
};
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
@@ -290,7 +313,6 @@ public class ParserBoltTest extends BaseBoltTest {
when(parser.validate(eq(messages.get(1)))).thenReturn(true);
when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false);
when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true);
- parserBolt.withMessageFilter(filter);
parserBolt.execute(tuple);
verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
verify(outputCollector, times(2)).ack(tuple);
@@ -317,21 +339,15 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testFilterSuccess() throws Exception {
String sensorType = "yaf";
-
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
- @Override
- protected SensorParserConfig getSensorParserConfig() {
- try {
- return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- @Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater(Optional.of(1));
- }
- };
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ null,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig);
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
@@ -358,10 +374,17 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testFilterFailure() throws Exception {
String sensorType = "yaf";
-
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ null,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
- protected SensorParserConfig getSensorParserConfig() {
+ protected SensorParserConfig getSensorParserConfig(String sensorType) {
try {
return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
} catch (IOException e) {
@@ -433,21 +456,15 @@ public class ParserBoltTest extends BaseBoltTest {
}
};
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, dummyParser, new WriterHandler(recordingWriter)) {
- @Override
- protected SensorParserConfig getSensorParserConfig() {
- try {
- return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
- return ParserBoltTest.createUpdater(Optional.of(1));
- }
- };
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ dummyParser,
+ null,
+ new WriterHandler(recordingWriter)
+ )
+ );
+ ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations);
parserBolt.setCuratorFramework(client);
parserBolt.setZKCache(cache);
@@ -461,10 +478,16 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testDefaultBatchSize() throws Exception {
-
String sensorType = "yaf";
-
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ filter,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
// this uses default batch size
@@ -487,7 +510,6 @@ public class ParserBoltTest extends BaseBoltTest {
response.addSuccess(uniqueTuples[i]);
}
when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response);
- parserBolt.withMessageFilter(filter);
for (Tuple tuple : uniqueTuples) {
parserBolt.execute(tuple);
}
@@ -498,10 +520,16 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testLessRecordsThanDefaultBatchSize() throws Exception {
-
String sensorType = "yaf";
-
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ filter,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
// this uses default batch size
@@ -524,7 +552,6 @@ public class ParserBoltTest extends BaseBoltTest {
uniqueTuples[i] = mock(Tuple.class);
response.addSuccess(uniqueTuples[i]);
}
- parserBolt.withMessageFilter(filter);
for (Tuple tuple : uniqueTuples) {
parserBolt.execute(tuple);
}
@@ -542,10 +569,16 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testBatchOfOne() throws Exception {
-
String sensorType = "yaf";
-
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ filter,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return ParserBoltTest.createUpdater(Optional.of(1));
@@ -563,17 +596,22 @@ public class ParserBoltTest extends BaseBoltTest {
BulkWriterResponse response = new BulkWriterResponse();
response.addSuccess(t1);
when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response);
- parserBolt.withMessageFilter(filter);
parserBolt.execute(t1);
verify(outputCollector, times(1)).ack(t1);
}
@Test
public void testBatchOfFive() throws Exception {
-
String sensorType = "yaf";
-
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ filter,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return ParserBoltTest.createUpdater(Optional.of(5));
@@ -592,7 +630,6 @@ public class ParserBoltTest extends BaseBoltTest {
BulkWriterResponse response = new BulkWriterResponse();
response.addAllSuccesses(tuples);
when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
- parserBolt.withMessageFilter(filter);
writeNonBatch(outputCollector, parserBolt, t1);
writeNonBatch(outputCollector, parserBolt, t2);
writeNonBatch(outputCollector, parserBolt, t3);
@@ -610,9 +647,16 @@ public class ParserBoltTest extends BaseBoltTest {
@Test
public void testBatchOfFiveWithError() throws Exception {
-
String sensorType = "yaf";
- ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+ Map<String, ParserComponents> parserMap = Collections.singletonMap(
+ sensorType,
+ new ParserComponents(
+ parser,
+ filter,
+ new WriterHandler(batchWriter)
+ )
+ );
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return ParserBoltTest.createUpdater(Optional.of(5));
@@ -629,7 +673,6 @@ public class ParserBoltTest extends BaseBoltTest {
when(parser.validate(any())).thenReturn(true);
when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
- parserBolt.withMessageFilter(filter);
parserBolt.execute(t1);
parserBolt.execute(t2);
parserBolt.execute(t3);
@@ -654,6 +697,25 @@ public class ParserBoltTest extends BaseBoltTest {
parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
}
+ private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap,
+ String csvWithFieldTransformations) {
+ return new ParserBolt("zookeeperUrl", parserMap) {
+ @Override
+ protected SensorParserConfig getSensorParserConfig(String sensorType) {
+ try {
+ return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+ return ParserBoltTest.createUpdater(Optional.of(1));
+ }
+ };
+ }
+
private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
bolt.execute(t);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index a23c368..b04d8f7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -18,6 +18,20 @@
package org.apache.metron.parsers.bolt;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.log4j.Level;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.IndexingConfigurations;
@@ -38,20 +52,6 @@ import org.json.simple.JSONObject;
import org.junit.Test;
import org.mockito.Mock;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
public class WriterBoltTest extends BaseBoltTest{
@Mock
protected TopologyContext topologyContext;
@@ -164,7 +164,7 @@ public class WriterBoltTest extends BaseBoltTest{
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.DEFAULT_ERROR)
.withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}"))
- .withSensorType(sensorType)
+ .withSensorType(Collections.singleton(sensorType))
.addRawMessage(new JSONObject());
verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
}
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index ec7c3ab..2cba40a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -17,9 +17,18 @@
*/
package org.apache.metron.parsers.integration;
-import com.google.common.collect.ImmutableList;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.lang.SerializationUtils;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.FieldValidator;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
@@ -30,29 +39,10 @@ import org.apache.metron.common.writer.MessageWriter;
import org.apache.metron.integration.ProcessorResult;
import org.apache.metron.parsers.bolt.ParserBolt;
import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.metron.parsers.topology.ParserComponents;
import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
import org.json.simple.JSONObject;
-import org.mockito.Matchers;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,12 +84,20 @@ public class ParserDriver implements Serializable {
public ShimParserBolt(List<byte[]> output) {
super(null
- , sensorType == null?config.getSensorTopic():sensorType
- , ReflectionUtils.createInstance(config.getParserClassName())
- , new WriterHandler( new CollectingWriter(output))
+ , Collections.singletonMap(
+ sensorType == null ? config.getSensorTopic() : sensorType,
+ new ParserComponents(
+ ReflectionUtils.createInstance(config.getParserClassName()),
+ null,
+ new WriterHandler(new CollectingWriter(output))
+ )
+ )
);
this.output = output;
- getParser().configure(config.getParserConfig());
+ Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap();
+ for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) {
+ sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 7f40684..15b53b7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,6 +17,17 @@
*/
package org.apache.metron.parsers.integration.components;
+import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.ZKServerComponent;
@@ -27,22 +38,13 @@ import org.apache.storm.generated.KillOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
-import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
-
public class ParserTopologyComponent implements InMemoryComponent {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Properties topologyProperties;
private String brokerUrl;
- private String sensorType;
+ private List<String> sensorTypes;
private LocalCluster stormCluster;
private String outputTopic;
private String errorTopic;
@@ -51,7 +53,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
Properties topologyProperties;
String brokerUrl;
- String sensorType;
+ List<String> sensorTypes;
String outputTopic;
String errorTopic;
@@ -63,8 +65,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
this.brokerUrl = brokerUrl;
return this;
}
- public Builder withSensorType(String sensorType) {
- this.sensorType = sensorType;
+ public Builder withSensorTypes(List<String> sensorTypes) {
+ this.sensorTypes = sensorTypes;
return this;
}
@@ -80,7 +82,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
public ParserTopologyComponent build() {
- if(sensorType == null) {
+ if(sensorTypes == null || sensorTypes.isEmpty()) {
throw new IllegalArgumentException("The sensor type must be defined.");
}
@@ -88,20 +90,20 @@ public class ParserTopologyComponent implements InMemoryComponent {
throw new IllegalArgumentException("The output topic must be defined.");
}
- return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic);
+ return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorTypes, outputTopic, errorTopic);
}
}
- public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) {
+ public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, List<String> sensorTypes, String outputTopic, String errorTopic) {
this.topologyProperties = topologyProperties;
this.brokerUrl = brokerUrl;
- this.sensorType = sensorType;
+ this.sensorTypes = sensorTypes;
this.outputTopic = outputTopic;
this.errorTopic = errorTopic;
}
- public void updateSensorType(String sensorType) {
- this.sensorType = sensorType;
+ public void updateSensorTypes(List<String> sensorTypes) {
+ this.sensorTypes = sensorTypes;
}
@Override
@@ -112,14 +114,14 @@ public class ParserTopologyComponent implements InMemoryComponent {
ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
Optional.ofNullable(brokerUrl),
- sensorType,
- (x,y) -> 1,
- (x,y) -> 1,
+ sensorTypes,
+ (x,y) -> Collections.nCopies(sensorTypes.size(), 1),
+ (x,y) -> Collections.nCopies(sensorTypes.size(), 1),
(x,y) -> 1,
(x,y) -> 1,
(x,y) -> 1,
(x,y) -> 1,
- (x,y) -> new HashMap<>(),
+ (x,y) -> Collections.nCopies(sensorTypes.size(), new HashMap<>()),
(x,y) -> null,
(x,y) -> outputTopic,
(x,y) -> errorTopic,
@@ -131,9 +133,9 @@ public class ParserTopologyComponent implements InMemoryComponent {
);
stormCluster = new LocalCluster();
- stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());
+ stormCluster.submitTopology(getTopologyName(), stormConf, topologyBuilder.getBuilder().createTopology());
} catch (Exception e) {
- throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e);
+ throw new UnableToStartException("Unable to start parser topology for sensorTypes: " + sensorTypes, e);
}
}
@@ -177,7 +179,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
protected void killTopology() {
KillOptions ko = new KillOptions();
ko.set_wait_secs(0);
- stormCluster.killTopologyWithOpts(sensorType, ko);
+ stormCluster.killTopologyWithOpts(getTopologyName(), ko);
try {
// Actually wait for it to die.
Thread.sleep(2000);
@@ -185,4 +187,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
// Do nothing
}
}
+
+ protected String getTopologyName() {
+ return StringUtils.join(sensorTypes, "__");
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index fcfc93b..ae459f4 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -18,31 +18,34 @@
package org.apache.metron.parsers.topology;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Parser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.parsers.topology.config.ValueSupplier;
import org.apache.metron.test.utils.UnitTestHelper;
import org.apache.storm.Config;
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.Reference;
-import java.util.*;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-
public class ParserTopologyCLITest {
@@ -103,11 +106,11 @@ public class ParserTopologyCLITest {
public void kafkaOffset(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.build(longOpt);
Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
- Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+ Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
}
@Test
public void testCLI_happyPath() throws ParseException {
@@ -127,11 +130,11 @@ public class ParserTopologyCLITest {
public void happyPath(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.build(longOpt);
Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
- Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+ Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
}
@Test
@@ -143,7 +146,7 @@ public class ParserTopologyCLITest {
public void testConfig_noExtra(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.with(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "1")
.with(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "2")
.with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3")
@@ -166,7 +169,7 @@ public class ParserTopologyCLITest {
public void testOutputTopic(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
.build(longOpt);
Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
@@ -193,7 +196,7 @@ public class ParserTopologyCLITest {
FileUtils.write(extraFile, extraConfig);
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
.with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath())
.build(longOpt);
@@ -208,50 +211,50 @@ public class ParserTopologyCLITest {
}
private static class ParserInput {
- private Integer spoutParallelism;
- private Integer spoutNumTasks;
+ private List<Integer> spoutParallelism;
+ private List<Integer> spoutNumTasks;
private Integer parserParallelism;
private Integer parserNumTasks;
private Integer errorParallelism;
private Integer errorNumTasks;
- private Map<String, Object> spoutConfig;
+ private List<Map<String, Object>> spoutConfig;
private String securityProtocol;
private Config stormConf;
private String outputTopic;
private String errorTopic;
- public ParserInput(ValueSupplier<Integer> spoutParallelism,
- ValueSupplier<Integer> spoutNumTasks,
+ public ParserInput(ValueSupplier<List> spoutParallelism,
+ ValueSupplier<List> spoutNumTasks,
ValueSupplier<Integer> parserParallelism,
ValueSupplier<Integer> parserNumTasks,
ValueSupplier<Integer> errorParallelism,
ValueSupplier<Integer> errorNumTasks,
- ValueSupplier<Map> spoutConfig,
+ ValueSupplier<List> spoutConfig,
ValueSupplier<String> securityProtocol,
ValueSupplier<Config> stormConf,
ValueSupplier<String> outputTopic,
ValueSupplier<String> errorTopic,
- SensorParserConfig config
+ List<SensorParserConfig> configs
)
{
- this.spoutParallelism = spoutParallelism.get(config, Integer.class);
- this.spoutNumTasks = spoutNumTasks.get(config, Integer.class);
- this.parserParallelism = parserParallelism.get(config, Integer.class);
- this.parserNumTasks = parserNumTasks.get(config, Integer.class);
- this.errorParallelism = errorParallelism.get(config, Integer.class);
- this.errorNumTasks = errorNumTasks.get(config, Integer.class);
- this.spoutConfig = spoutConfig.get(config, Map.class);
- this.securityProtocol = securityProtocol.get(config, String.class);
- this.stormConf = stormConf.get(config, Config.class);
- this.outputTopic = outputTopic.get(config, String.class);
- this.errorTopic = outputTopic.get(config, String.class);
+ this.spoutParallelism = spoutParallelism.get(configs, List.class);
+ this.spoutNumTasks = spoutNumTasks.get(configs, List.class);
+ this.parserParallelism = parserParallelism.get(configs, Integer.class);
+ this.parserNumTasks = parserNumTasks.get(configs, Integer.class);
+ this.errorParallelism = errorParallelism.get(configs, Integer.class);
+ this.errorNumTasks = errorNumTasks.get(configs, Integer.class);
+ this.spoutConfig = spoutConfig.get(configs, List.class);
+ this.securityProtocol = securityProtocol.get(configs, String.class);
+ this.stormConf = stormConf.get(configs, Config.class);
+ this.outputTopic = outputTopic.get(configs, String.class);
+ this.errorTopic = errorTopic.get(configs, String.class);
}
- public Integer getSpoutParallelism() {
+ public List<Integer> getSpoutParallelism() {
return spoutParallelism;
}
- public Integer getSpoutNumTasks() {
+ public List<Integer> getSpoutNumTasks() {
return spoutNumTasks;
}
@@ -271,7 +274,7 @@ public class ParserTopologyCLITest {
return errorNumTasks;
}
- public Map<String, Object> getSpoutConfig() {
+ public List<Map<String, Object>> getSpoutConfig() {
return spoutConfig;
}
@@ -330,43 +333,116 @@ public class ParserTopologyCLITest {
@Test
public void testSpoutParallelism() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
- , "10"
- , input -> input.getSpoutParallelism().equals(10)
- , () -> {
- SensorParserConfig config = getBaseConfig();
- config.setSpoutParallelism(20);
- return config;
- }
- , input -> input.getSpoutParallelism().equals(20)
- );
+ , "10"
+ , input -> input.getSpoutParallelism().equals(Collections.singletonList(10))
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setSpoutParallelism(20);
+ return Collections.singletonList(config);
+ }
+ , input -> input.getSpoutParallelism().equals(Collections.singletonList(20))
+ );
+ }
+
+ @Test
+ public void testSpoutParallelismMultiple() throws Exception {
+ // Each spout uses it's own
+ // Return one per spout.
+ List<Integer> spoutParCli = new ArrayList<>();
+ spoutParCli.add(10);
+ spoutParCli.add(12);
+ List<Integer> spoutParConfig = new ArrayList<>();
+ spoutParConfig.add(20);
+ spoutParConfig.add(30);
+ testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
+ , "10,12"
+ , input -> input.getSpoutParallelism().equals(spoutParCli)
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setSpoutParallelism(20);
+ SensorParserConfig config2 = getBaseConfig();
+ config2.setSpoutParallelism(30);
+ List<SensorParserConfig> configs = new ArrayList<>();
+ configs.add(config);
+ configs.add(config2);
+ return configs;
+ }
+ , input -> input.getSpoutParallelism().equals(spoutParConfig)
+ );
}
@Test
public void testSpoutNumTasks() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
, "10"
- , input -> input.getSpoutNumTasks().equals(10)
+ , input -> input.getSpoutNumTasks().equals(Collections.singletonList(10))
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutNumTasks(20);
- return config;
+ return Collections.singletonList(config);
}
- , input -> input.getSpoutNumTasks().equals(20)
+ , input -> input.getSpoutNumTasks().equals(Collections.singletonList(20))
);
}
@Test
+ public void testSpoutNumTasksMultiple() throws Exception {
+ // Return one per spout.
+ List<Integer> numTasksCli = new ArrayList<>();
+ numTasksCli.add(10);
+ numTasksCli.add(12);
+ List<Integer> numTasksConfig = new ArrayList<>();
+ numTasksConfig.add(20);
+ numTasksConfig.add(30);
+ testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
+ , "10,12"
+ , input -> input.getSpoutNumTasks().equals(numTasksCli)
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setSpoutNumTasks(20);
+ SensorParserConfig config2 = getBaseConfig();
+ config2.setSpoutNumTasks(30);
+ List<SensorParserConfig> configs = new ArrayList<>();
+ configs.add(config);
+ configs.add(config2);
+ return configs;
+ }
+ , input -> input.getSpoutNumTasks().equals(numTasksConfig)
+ );
+ }
+
+ @Test
public void testParserParallelism() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
- , "10"
- , input -> input.getParserParallelism().equals(10)
- , () -> {
- SensorParserConfig config = getBaseConfig();
- config.setParserParallelism(20);
- return config;
- }
- , input -> input.getParserParallelism().equals(20)
- );
+ , "10"
+ , input -> input.getParserParallelism().equals(10)
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setParserParallelism(20);
+ return Collections.singletonList(config);
+ }
+ , input -> input.getParserParallelism().equals(20)
+ );
+ }
+
+ @Test
+ public void testParserParallelismMultiple() throws Exception {
+ // Last one wins
+ testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
+ , "10"
+ , input -> input.getParserParallelism().equals(10)
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setParserParallelism(20);
+ SensorParserConfig config2 = getBaseConfig();
+ config2.setParserParallelism(30);
+ List<SensorParserConfig> configs = new ArrayList<>();
+ configs.add(config);
+ configs.add(config2);
+ return configs;
+ }
+ , input -> input.getParserParallelism().equals(30)
+ );
}
@Test
@@ -377,13 +453,32 @@ public class ParserTopologyCLITest {
, () -> {
SensorParserConfig config = getBaseConfig();
config.setParserNumTasks(20);
- return config;
+ SensorParserConfig config2 = getBaseConfig();
+ config2.setParserNumTasks(30);
+ List<SensorParserConfig> configs = new ArrayList<>();
+ configs.add(config);
+ configs.add(config2);
+ return configs;
}
- , input -> input.getParserNumTasks().equals(20)
+ , input -> input.getParserNumTasks().equals(30)
);
}
@Test
+ public void testParserNumTasksMultiple() throws Exception {
+ testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
+ , "10"
+ , input -> input.getParserNumTasks().equals(10)
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setParserNumTasks(20);
+ return Collections.singletonList(config);
+ }
+ , input -> input.getParserNumTasks().equals(20)
+ );
+ }
+
+ @Test
public void testErrorParallelism() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM
, "10"
@@ -391,7 +486,7 @@ public class ParserTopologyCLITest {
, () -> {
SensorParserConfig config = getBaseConfig();
config.setErrorWriterParallelism(20);
- return config;
+ return Collections.singletonList(config);
}
, input -> input.getErrorParallelism().equals(20)
);
@@ -405,7 +500,7 @@ public class ParserTopologyCLITest {
, () -> {
SensorParserConfig config = getBaseConfig();
config.setErrorWriterNumTasks(20);
- return config;
+ return Collections.singletonList(config);
}
, input -> input.getErrorNumTasks().equals(20)
);
@@ -419,13 +514,55 @@ public class ParserTopologyCLITest {
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("KERBEROS");
- return config;
+ return Collections.singletonList(config);
}
, input -> input.getSecurityProtocol().equals("KERBEROS")
);
}
@Test
+ public void testSecurityProtocol_fromCLIMultipleUniform() throws Exception {
+ testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+ , "PLAINTEXT"
+ , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setSecurityProtocol("PLAINTEXT");
+ SensorParserConfig config2 = getBaseConfig();
+ config2.setSecurityProtocol("PLAINTEXT");
+ List<SensorParserConfig> configs = new ArrayList<>();
+ configs.add(config);
+ configs.add(config2);
+ return configs;
+ }
+ , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+ );
+ }
+
+ @Test
+ public void testSecurityProtocol_fromCLIMultipleMixed() throws Exception {
+ // Non plaintext wins
+ testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+ , "PLAINTEXT"
+ , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+ , () -> {
+ SensorParserConfig config = getBaseConfig();
+ config.setSecurityProtocol("PLAINTEXT");
+ SensorParserConfig config2 = getBaseConfig();
+ config2.setSecurityProtocol("KERBEROS");
+ SensorParserConfig config3 = getBaseConfig();
+ config3.setSecurityProtocol("PLAINTEXT");
+ List<SensorParserConfig> configs = new ArrayList<>();
+ configs.add(config);
+ configs.add(config2);
+ configs.add(config3);
+ return configs;
+ }
+ , input -> input.getSecurityProtocol().equals("KERBEROS")
+ );
+ }
+
+ @Test
public void testSecurityProtocol_fromSpout() throws Exception {
//Ultimately the order of precedence is CLI > spout config > parser config
File extraConfig = File.createTempFile("spoutConfig", "json");
@@ -444,7 +581,7 @@ public class ParserTopologyCLITest {
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
- return config;
+ return Collections.singletonList(config);
}
, input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
);
@@ -458,7 +595,7 @@ public class ParserTopologyCLITest {
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
- return config;
+ return Collections.singletonList(config);
}
, input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
);
@@ -481,7 +618,7 @@ public class ParserTopologyCLITest {
SensorParserConfig config = getBaseConfig();
config.setNumWorkers(100);
config.setNumAckers(200);
- return config;
+ return Collections.singletonList(config);
}
, input -> {
Config c = input.getStormConf();
@@ -519,7 +656,7 @@ public class ParserTopologyCLITest {
put(Config.TOPOLOGY_ACKER_EXECUTORS, 200);
}}
);
- return config;
+ return Collections.singletonList(config);
}
, input -> {
Config c = input.getStormConf();
@@ -542,22 +679,21 @@ public class ParserTopologyCLITest {
put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
}};
Predicate<ParserInput> cliOverrideExpected = input -> {
- return input.getSpoutConfig().get("extra_config").equals("from_file");
+ return input.getSpoutConfig().get(0).get("extra_config").equals("from_file");
};
Predicate<ParserInput> configOverrideExpected = input -> {
- return input.getSpoutConfig().get("extra_config").equals("from_zk")
- ;
+ return input.getSpoutConfig().get(0).get("extra_config").equals("from_zk");
};
- Supplier<SensorParserConfig> configSupplier = () -> {
+ Supplier<List<SensorParserConfig>> configSupplier = () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutConfig(
new HashMap<String, Object>() {{
put("extra_config", "from_zk");
}}
);
- return config;
+ return Collections.singletonList(config);
};
testConfigOption( cliOptions
, cliOverrideExpected
@@ -573,7 +709,7 @@ public class ParserTopologyCLITest {
private void testConfigOption( ParserTopologyCLI.ParserOptions option
, String cliOverride
, Predicate<ParserInput> cliOverrideCondition
- , Supplier<SensorParserConfig> configSupplier
+ , Supplier<List<SensorParserConfig>> configSupplier
, Predicate<ParserInput> configOverrideCondition
) throws Exception {
testConfigOption(
@@ -588,48 +724,48 @@ public class ParserTopologyCLITest {
private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options
, Predicate<ParserInput> cliOverrideCondition
- , Supplier<SensorParserConfig> configSupplier
+ , Supplier<List<SensorParserConfig>> configSupplier
, Predicate<ParserInput> configOverrideCondition
) throws Exception {
//CLI Override
- SensorParserConfig config = configSupplier.get();
+ List<SensorParserConfig> configs = configSupplier.get();
{
CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) {
builder.with(entry.getKey(), entry.getValue());
}
CommandLine cmd = builder.build(true);
- ParserInput input = getInput(cmd, config);
+ ParserInput input = getInput(cmd, configs);
Assert.assertTrue(cliOverrideCondition.test(input));
}
// Config Override
{
CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
- .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+ .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
CommandLine cmd = builder.build(true);
- ParserInput input = getInput(cmd, config);
+ ParserInput input = getInput(cmd, configs);
Assert.assertTrue(configOverrideCondition.test(input));
}
}
- private static ParserInput getInput(CommandLine cmd, SensorParserConfig config ) throws Exception {
+ private static ParserInput getInput(CommandLine cmd, List<SensorParserConfig> configs ) throws Exception {
final ParserInput[] parserInput = new ParserInput[]{null};
new ParserTopologyCLI() {
@Override
protected ParserTopologyBuilder.ParserTopology getParserTopology(
String zookeeperUrl,
Optional<String> brokerUrl,
- String sensorType,
- ValueSupplier<Integer> spoutParallelism,
- ValueSupplier<Integer> spoutNumTasks,
+ List<String> sensorType,
+ ValueSupplier<List> spoutParallelism,
+ ValueSupplier<List> spoutNumTasks,
ValueSupplier<Integer> parserParallelism,
ValueSupplier<Integer> parserNumTasks,
ValueSupplier<Integer> errorParallelism,
ValueSupplier<Integer> errorNumTasks,
- ValueSupplier<Map> spoutConfig,
+ ValueSupplier<List> spoutConfig,
ValueSupplier<String> securityProtocol,
ValueSupplier<Config> stormConf,
ValueSupplier<String> outputTopic,
@@ -647,7 +783,7 @@ public class ParserTopologyCLITest {
stormConf,
outputTopic,
errorTopic,
- config
+ configs
);
return null;
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 49d7521..788df2d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.metron.writers.integration;
import com.google.common.collect.ImmutableList;
+import java.util.Collections;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
@@ -107,7 +108,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
.withParserSensorConfig(sensorType, parserConfig);
ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
- .withSensorType(sensorType)
+ .withSensorTypes(Collections.singletonList(sensorType))
.withTopologyProperties(topologyProperties)
.withBrokerUrl(kafkaComponent.getBrokerList())
.withOutputTopic(parserConfig.getOutputTopic())
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index 99506de..cecba3d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -95,6 +96,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
* "sensorTopic": "dummy",
* "outputTopic": "output",
* "errorTopic": "parser_error",
+ * "readMetadata": true,
* "parserConfig": {
* "batchSize" : 1,
* "columns" : {
@@ -148,7 +150,12 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
}};
final Properties topologyProperties = new Properties();
- ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+ ComponentRunner runner = setupTopologyComponents(
+ topologyProperties,
+ Collections.singletonList(sensorType),
+ Collections.singletonList(parserConfig),
+ globalConfigWithValidation
+ );
try {
runner.start();
kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -172,7 +179,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
@Test
public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception {
- final String sensorType = "dummy";
+ final String sensorType = "dummy";
SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
add(Bytes.toBytes("valid,foo"));
@@ -181,7 +188,8 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
}};
final Properties topologyProperties = new Properties();
- ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+ ComponentRunner runner = setupTopologyComponents(topologyProperties, Collections.singletonList(sensorType),
+ Collections.singletonList(parserConfig), globalConfigWithValidation);
try {
runner.start();
kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -223,27 +231,31 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
*
* @return runner
*/
- public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType,
- SensorParserConfig parserConfig, String globalConfig) {
+ public ComponentRunner setupTopologyComponents(Properties topologyProperties, List<String> sensorTypes,
+ List<SensorParserConfig> parserConfigs, String globalConfig) {
zkServerComponent = getZKServerComponent(topologyProperties);
- kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
- add(new KafkaComponent.Topic(sensorType, 1));
- add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
- add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
- }});
+ List<KafkaComponent.Topic> topics = new ArrayList<>();
+ for(String sensorType : sensorTypes) {
+ topics.add(new KafkaComponent.Topic(sensorType, 1));
+ }
+ topics.add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+ kafkaComponent = getKafkaComponent(topologyProperties, topics);
topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
configUploadComponent = new ConfigUploadComponent()
.withTopologyProperties(topologyProperties)
- .withGlobalConfig(globalConfig)
- .withParserSensorConfig(sensorType, parserConfig);
+ .withGlobalConfig(globalConfig);
+
+ for (int i = 0; i < sensorTypes.size(); ++i) {
+ configUploadComponent.withParserSensorConfig(sensorTypes.get(i), parserConfigs.get(i));
+ }
parserTopologyComponent = new ParserTopologyComponent.Builder()
- .withSensorType(sensorType)
+ .withSensorTypes(sensorTypes)
.withTopologyProperties(topologyProperties)
.withBrokerUrl(kafkaComponent.getBrokerList())
- .withErrorTopic(parserConfig.getErrorTopic())
- .withOutputTopic(parserConfig.getOutputTopic())
+ .withErrorTopic(parserConfigs.get(0).getErrorTopic())
+ .withOutputTopic(parserConfigs.get(0).getOutputTopic())
.build();
return new ComponentRunner.Builder()
@@ -325,8 +337,22 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
@Multiline
public static String offsetParserConfigJSON;
+ /**
+ * {
+ * "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$DummyObjectParser",
+ * "sensorTopic":"dummyobjectparser",
+ * "outputTopic": "enrichments",
+ * "errorTopic": "parser_error",
+ * "parserConfig": {
+ * "batchSize" : 1
+ * }
+ * }
+ */
+ @Multiline
+ public static String dummyParserConfigJSON;
+
@Test
- public void commits_kafka_offsets_for_emtpy_objects() throws Exception {
+ public void commits_kafka_offsets_for_empty_objects() throws Exception {
final String sensorType = "emptyobjectparser";
SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
@@ -335,7 +361,11 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
add(Bytes.toBytes("baz"));
}};
final Properties topologyProperties = new Properties();
- ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty);
+ ComponentRunner runner = setupTopologyComponents(
+ topologyProperties,
+ Collections.singletonList(sensorType),
+ Collections.singletonList(parserConfig),
+ globalConfigEmpty);
try {
runner.start();
kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -356,6 +386,64 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
}
}
+ @Test
+ public void test_multiple_sensors() throws Exception {
+ // Setup first sensor
+ final String emptyObjectSensorType = "emptyobjectparser";
+ SensorParserConfig emptyObjectParserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
+ final List<byte[]> emptyObjectInputMessages = new ArrayList<byte[]>() {{
+ add(Bytes.toBytes("foo"));
+ add(Bytes.toBytes("bar"));
+ add(Bytes.toBytes("baz"));
+ }};
+
+ // Setup second sensor
+ final String dummySensorType = "dummyobjectparser";
+ SensorParserConfig dummyParserConfig = JSONUtils.INSTANCE.load(dummyParserConfigJSON, SensorParserConfig.class);
+ final List<byte[]> dummyInputMessages = new ArrayList<byte[]>() {{
+ add(Bytes.toBytes("dummy_foo"));
+ add(Bytes.toBytes("dummy_bar"));
+ add(Bytes.toBytes("dummy_baz"));
+ }};
+
+ final Properties topologyProperties = new Properties();
+
+ List<String> sensorTypes = new ArrayList<>();
+ sensorTypes.add(emptyObjectSensorType);
+ sensorTypes.add(dummySensorType);
+
+ List<SensorParserConfig> parserConfigs = new ArrayList<>();
+ parserConfigs.add(emptyObjectParserConfig);
+ parserConfigs.add(dummyParserConfig);
+
+ ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorTypes, parserConfigs, globalConfigEmpty);
+ try {
+ runner.start();
+ kafkaComponent.writeMessages(emptyObjectSensorType, emptyObjectInputMessages);
+ kafkaComponent.writeMessages(dummySensorType, dummyInputMessages);
+
+ final List<byte[]> allInputMessages = new ArrayList<>();
+ allInputMessages.addAll(emptyObjectInputMessages);
+ allInputMessages.addAll(dummyInputMessages);
+ Processor allResultsProcessor = new AllResultsProcessor(allInputMessages, Constants.ENRICHMENT_TOPIC);
+ @SuppressWarnings("unchecked")
+ ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor);
+
+ // validate the output messages
+ assertThat(
+ "size should match",
+ result.getResult().size(),
+ equalTo(allInputMessages.size()));
+ for (JSONObject record : result.getResult()) {
+ assertThat("record should have a guid", record.containsKey("guid"), equalTo(true));
+ }
+ } finally {
+ if (runner != null) {
+ runner.stop();
+ }
+ }
+ }
+
/**
* Goal is to check returning an empty JSONObject in our List returned by parse.
*/
@@ -380,6 +468,34 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
}
}
+
+ /**
+ * Goal is to check returning an empty JSONObject in our List returned by parse.
+ */
+ public static class DummyObjectParser implements MessageParser<JSONObject>, Serializable {
+
+ @Override
+ public void init() {
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<JSONObject> parse(byte[] bytes) {
+ JSONObject dummy = new JSONObject();
+ dummy.put("dummy_key", "dummy_value");
+ return ImmutableList.of(dummy);
+ }
+
+ @Override
+ public boolean validate(JSONObject message) {
+ return true;
+ }
+
+ @Override
+ public void configure(Map<String, Object> map) {
+ }
+ }
+
/**
* Verifies all messages in the provided List of input messages appears in the specified
* Kafka output topic
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index e35960f..7678584 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -23,6 +23,7 @@ import static java.lang.String.format;
import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -118,7 +119,7 @@ public class BulkWriterComponent<MESSAGE_T> {
public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e);
MetronError error = new MetronError()
- .withSensorType(sensorType)
+ .withSensorType(Collections.singleton(sensorType))
.withErrorType(Constants.ErrorType.INDEXING_ERROR)
.withThrowable(e);
tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 0264b3d..c389854 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -17,11 +17,22 @@
*/
package org.apache.metron.writer;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.error.MetronError;
import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
@@ -38,19 +49,6 @@ import org.mockito.MockitoAnnotations;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.verifyStatic;
-
@RunWith(PowerMockRunner.class)
@PrepareForTest({BulkWriterComponent.class, ErrorUtils.class})
public class BulkWriterComponentTest {
@@ -130,7 +128,7 @@ public class BulkWriterComponentTest {
public void writeShouldProperlyHandleWriterErrors() throws Exception {
Throwable e = new Exception("test exception");
MetronError error = new MetronError()
- .withSensorType(sensorType)
+ .withSensorType(Collections.singleton(sensorType))
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(e, tupleList);
@@ -164,7 +162,7 @@ public class BulkWriterComponentTest {
public void writeShouldProperlyHandleWriterException() throws Exception {
Throwable e = new Exception("test exception");
MetronError error = new MetronError()
- .withSensorType(sensorType)
+ .withSensorType(Collections.singleton(sensorType))
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(e, tupleList);
@@ -183,10 +181,10 @@ public class BulkWriterComponentTest {
public void errorAllShouldClearMapsAndHandleErrors() throws Exception {
Throwable e = new Exception("test exception");
MetronError error1 = new MetronError()
- .withSensorType("sensor1")
+ .withSensorType(Collections.singleton("sensor1"))
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
MetronError error2 = new MetronError()
- .withSensorType("sensor2")
+ .withSensorType(Collections.singleton("sensor2"))
.withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/use-cases/parser_chaining/README.md
----------------------------------------------------------------------
diff --git a/use-cases/parser_chaining/README.md b/use-cases/parser_chaining/README.md
index 26fd333..4055bcd 100644
--- a/use-cases/parser_chaining/README.md
+++ b/use-cases/parser_chaining/README.md
@@ -233,3 +233,17 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
```
You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
+
+# Aggregated Parsers with Parser Chaining
+Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
+
+Instead of creating a topology per sensor, all 3 (`pix-syslog-parser`, `cisco-5-304`, and `cisco-6-302`) can be run in a single aggregated parser. It's also possible to aggregate a subset of these parsers (e.g. run `cisco-6-302` as it's own topology, and aggregate the other 2).
+
+The step to start parsers then becomes
+```
+$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302,cisco-5-304,pix_syslog_router
+```
+
+The flow through the Storm topology and Kafka topics:
+
+![Aggregated Flow](aggregated_parser_chaining_flow.svg)
\ No newline at end of file