You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/20 15:40:39 UTC

[11/15] metron git commit: METRON-1657 Parser aggregation in storm (justinleet) closes apache/metron#1099

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 15ce735..06f4cec 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -58,6 +58,7 @@ import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
 import org.apache.metron.parsers.BasicParser;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
@@ -185,7 +186,15 @@ public class ParserBoltTest extends BaseBoltTest {
   @Test
   public void testEmpty() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(writer)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater();
@@ -209,7 +218,7 @@ public class ParserBoltTest extends BaseBoltTest {
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
             .withThrowable(new NullPointerException())
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(sampleBinary);
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }
@@ -217,7 +226,15 @@ public class ParserBoltTest extends BaseBoltTest {
   @Test
   public void testInvalid() throws Exception {
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(writer)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater();
@@ -243,7 +260,7 @@ public class ParserBoltTest extends BaseBoltTest {
 
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_INVALID)
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorFields(new HashSet<String>() {{ add("field"); }})
             .addRawMessage(new JSONObject(){{
               put("field", "invalidValue");
@@ -255,14 +272,20 @@ public class ParserBoltTest extends BaseBoltTest {
 
   @Test
   public void test() throws Exception {
-
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(writer)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater();
       }
-
     };
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -290,7 +313,6 @@ public class ParserBoltTest extends BaseBoltTest {
     when(parser.validate(eq(messages.get(1)))).thenReturn(true);
     when(filter.emitTuple(eq(messages.get(0)), any())).thenReturn(false);
     when(filter.emitTuple(eq(messages.get(1)), any())).thenReturn(true);
-    parserBolt.withMessageFilter(filter);
     parserBolt.execute(tuple);
     verify(writer, times(1)).write(eq(sensorType), any(ParserWriterConfiguration.class), eq(tuple), eq(finalMessage2));
     verify(outputCollector, times(2)).ack(tuple);
@@ -317,21 +339,15 @@ public class ParserBoltTest extends BaseBoltTest {
   @Test
   public void testFilterSuccess() throws Exception {
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
-      @Override
-      protected SensorParserConfig getSensorParserConfig() {
-        try {
-          return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
-      }
-    };
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = buildParserBolt(parserMap, sensorParserConfig);
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -358,10 +374,17 @@ public class ParserBoltTest extends BaseBoltTest {
   @Test
   public void testFilterFailure() throws Exception {
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            null,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
-      protected SensorParserConfig getSensorParserConfig() {
+      protected SensorParserConfig getSensorParserConfig(String sensorType) {
         try {
           return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
         } catch (IOException e) {
@@ -433,21 +456,15 @@ public class ParserBoltTest extends BaseBoltTest {
 
       }
     };
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, dummyParser, new WriterHandler(recordingWriter)) {
-      @Override
-      protected SensorParserConfig getSensorParserConfig() {
-        try {
-          return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      @Override
-      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
-        return ParserBoltTest.createUpdater(Optional.of(1));
-      }
-    };
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            dummyParser,
+            null,
+            new WriterHandler(recordingWriter)
+        )
+    );
+    ParserBolt parserBolt = buildParserBolt(parserMap, csvWithFieldTransformations);
 
     parserBolt.setCuratorFramework(client);
     parserBolt.setZKCache(cache);
@@ -461,10 +478,16 @@ public class ParserBoltTest extends BaseBoltTest {
 
   @Test
   public void testDefaultBatchSize() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         // this uses default batch size
@@ -487,7 +510,6 @@ public class ParserBoltTest extends BaseBoltTest {
       response.addSuccess(uniqueTuples[i]);
     }
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(new HashSet<>(Arrays.asList(uniqueTuples))), any())).thenReturn(response);
-    parserBolt.withMessageFilter(filter);
     for (Tuple tuple : uniqueTuples) {
       parserBolt.execute(tuple);
     }
@@ -498,10 +520,16 @@ public class ParserBoltTest extends BaseBoltTest {
 
   @Test
   public void testLessRecordsThanDefaultBatchSize() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         // this uses default batch size
@@ -524,7 +552,6 @@ public class ParserBoltTest extends BaseBoltTest {
       uniqueTuples[i] = mock(Tuple.class);
       response.addSuccess(uniqueTuples[i]);
     }
-    parserBolt.withMessageFilter(filter);
     for (Tuple tuple : uniqueTuples) {
       parserBolt.execute(tuple);
     }
@@ -542,10 +569,16 @@ public class ParserBoltTest extends BaseBoltTest {
 
   @Test
   public void testBatchOfOne() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater(Optional.of(1));
@@ -563,17 +596,22 @@ public class ParserBoltTest extends BaseBoltTest {
     BulkWriterResponse response = new BulkWriterResponse();
     response.addSuccess(t1);
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(Collections.singleton(t1)), any())).thenReturn(response);
-    parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
     verify(outputCollector, times(1)).ack(t1);
   }
 
   @Test
   public void testBatchOfFive() throws Exception {
-
     String sensorType = "yaf";
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater(Optional.of(5));
@@ -592,7 +630,6 @@ public class ParserBoltTest extends BaseBoltTest {
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllSuccesses(tuples);
     when(batchWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tuples), any())).thenReturn(response);
-    parserBolt.withMessageFilter(filter);
     writeNonBatch(outputCollector, parserBolt, t1);
     writeNonBatch(outputCollector, parserBolt, t2);
     writeNonBatch(outputCollector, parserBolt, t3);
@@ -610,9 +647,16 @@ public class ParserBoltTest extends BaseBoltTest {
 
   @Test
   public void testBatchOfFiveWithError() throws Exception {
-
     String sensorType = "yaf";
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+    Map<String, ParserComponents> parserMap = Collections.singletonMap(
+        sensorType,
+        new ParserComponents(
+            parser,
+            filter,
+            new WriterHandler(batchWriter)
+        )
+    );
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserMap) {
       @Override
       protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
         return ParserBoltTest.createUpdater(Optional.of(5));
@@ -629,7 +673,6 @@ public class ParserBoltTest extends BaseBoltTest {
     when(parser.validate(any())).thenReturn(true);
     when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
     when(filter.emitTuple(any(), any(Context.class))).thenReturn(true);
-    parserBolt.withMessageFilter(filter);
     parserBolt.execute(t1);
     parserBolt.execute(t2);
     parserBolt.execute(t3);
@@ -654,6 +697,25 @@ public class ParserBoltTest extends BaseBoltTest {
     parserBolt.getConfigurations().updateGlobalConfig(globalConfig);
   }
 
+  private ParserBolt buildParserBolt(Map<String, ParserComponents> parserMap,
+      String csvWithFieldTransformations) {
+    return new ParserBolt("zookeeperUrl", parserMap) {
+      @Override
+      protected SensorParserConfig getSensorParserConfig(String sensorType) {
+        try {
+          return SensorParserConfig.fromBytes(Bytes.toBytes(csvWithFieldTransformations));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
+        return ParserBoltTest.createUpdater(Optional.of(1));
+      }
+    };
+  }
+
   private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
     bolt.execute(t);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index a23c368..b04d8f7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -18,6 +18,20 @@
 
 package org.apache.metron.parsers.bolt;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.log4j.Level;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.IndexingConfigurations;
@@ -38,20 +52,6 @@ import org.json.simple.JSONObject;
 import org.junit.Test;
 import org.mockito.Mock;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 public class WriterBoltTest extends BaseBoltTest{
   @Mock
   protected TopologyContext topologyContext;
@@ -164,7 +164,7 @@ public class WriterBoltTest extends BaseBoltTest{
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.DEFAULT_ERROR)
             .withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}"))
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(new JSONObject());
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
index ec7c3ab..2cba40a 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserDriver.java
@@ -17,9 +17,18 @@
  */
 package org.apache.metron.parsers.integration;
 
-import com.google.common.collect.ImmutableList;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import org.apache.commons.lang.SerializationUtils;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.FieldValidator;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
@@ -30,29 +39,10 @@ import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.integration.ProcessorResult;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
-import org.apache.metron.parsers.interfaces.MessageParser;
-import org.apache.storm.generated.GlobalStreamId;
-import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.metron.parsers.topology.ParserComponents;
 import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.MessageId;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
 import org.json.simple.JSONObject;
-import org.mockito.Matchers;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,12 +84,20 @@ public class ParserDriver implements Serializable {
 
     public ShimParserBolt(List<byte[]> output) {
       super(null
-           , sensorType == null?config.getSensorTopic():sensorType
-           , ReflectionUtils.createInstance(config.getParserClassName())
-           , new WriterHandler( new CollectingWriter(output))
+          , Collections.singletonMap(
+              sensorType == null ? config.getSensorTopic() : sensorType,
+              new ParserComponents(
+              ReflectionUtils.createInstance(config.getParserClassName()),
+                  null,
+                  new WriterHandler(new CollectingWriter(output))
+              )
+         )
       );
       this.output = output;
-      getParser().configure(config.getParserConfig());
+      Map<String, ParserComponents> sensorToComponentMap = getSensorToComponentMap();
+      for(Entry<String, ParserComponents> sensorToComponents : sensorToComponentMap.entrySet()) {
+        sensorToComponents.getValue().getMessageParser().configure(config.getParserConfig());
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 7f40684..15b53b7 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,6 +17,17 @@
  */
 package org.apache.metron.parsers.integration.components;
 
+import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.ZKServerComponent;
@@ -27,22 +38,13 @@ import org.apache.storm.generated.KillOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
-import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
-
 public class ParserTopologyComponent implements InMemoryComponent {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private Properties topologyProperties;
   private String brokerUrl;
-  private String sensorType;
+  private List<String> sensorTypes;
   private LocalCluster stormCluster;
   private String outputTopic;
   private String errorTopic;
@@ -51,7 +53,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
 
     Properties topologyProperties;
     String brokerUrl;
-    String sensorType;
+    List<String> sensorTypes;
     String outputTopic;
     String errorTopic;
 
@@ -63,8 +65,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
       this.brokerUrl = brokerUrl;
       return this;
     }
-    public Builder withSensorType(String sensorType) {
-      this.sensorType = sensorType;
+    public Builder withSensorTypes(List<String> sensorTypes) {
+      this.sensorTypes = sensorTypes;
       return this;
     }
 
@@ -80,7 +82,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
 
     public ParserTopologyComponent build() {
 
-      if(sensorType == null) {
+      if(sensorTypes == null || sensorTypes.isEmpty()) {
         throw new IllegalArgumentException("The sensor type must be defined.");
       }
 
@@ -88,20 +90,20 @@ public class ParserTopologyComponent implements InMemoryComponent {
         throw new IllegalArgumentException("The output topic must be defined.");
       }
 
-      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic);
+      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorTypes, outputTopic, errorTopic);
     }
   }
 
-  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) {
+  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, List<String> sensorTypes, String outputTopic, String errorTopic) {
     this.topologyProperties = topologyProperties;
     this.brokerUrl = brokerUrl;
-    this.sensorType = sensorType;
+    this.sensorTypes = sensorTypes;
     this.outputTopic = outputTopic;
     this.errorTopic = errorTopic;
   }
 
-  public void updateSensorType(String sensorType) {
-    this.sensorType = sensorType;
+  public void updateSensorTypes(List<String> sensorTypes) {
+    this.sensorTypes = sensorTypes;
   }
 
   @Override
@@ -112,14 +114,14 @@ public class ParserTopologyComponent implements InMemoryComponent {
       ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
               topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
               Optional.ofNullable(brokerUrl),
-              sensorType,
-              (x,y) -> 1,
-              (x,y) -> 1,
+              sensorTypes,
+              (x,y) -> Collections.nCopies(sensorTypes.size(), 1),
+              (x,y) -> Collections.nCopies(sensorTypes.size(), 1),
               (x,y) -> 1,
               (x,y) -> 1,
               (x,y) -> 1,
               (x,y) -> 1,
-              (x,y) -> new HashMap<>(),
+              (x,y) -> Collections.nCopies(sensorTypes.size(), new HashMap<>()),
               (x,y) -> null,
               (x,y) -> outputTopic,
               (x,y) -> errorTopic,
@@ -131,9 +133,9 @@ public class ParserTopologyComponent implements InMemoryComponent {
       );
 
       stormCluster = new LocalCluster();
-      stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());
+      stormCluster.submitTopology(getTopologyName(), stormConf, topologyBuilder.getBuilder().createTopology());
     } catch (Exception e) {
-      throw new UnableToStartException("Unable to start parser topology for sensorType: " + sensorType, e);
+      throw new UnableToStartException("Unable to start parser topology for sensorTypes: " + sensorTypes, e);
     }
   }
 
@@ -177,7 +179,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
   protected void killTopology() {
     KillOptions ko = new KillOptions();
     ko.set_wait_secs(0);
-    stormCluster.killTopologyWithOpts(sensorType, ko);
+    stormCluster.killTopologyWithOpts(getTopologyName(), ko);
     try {
       // Actually wait for it to die.
       Thread.sleep(2000);
@@ -185,4 +187,8 @@ public class ParserTopologyComponent implements InMemoryComponent {
       // Do nothing
     }
   }
+
+  protected String getTopologyName() {
+    return StringUtils.join(sensorTypes, "__");
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index fcfc93b..ae459f4 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -18,31 +18,34 @@
 
 package org.apache.metron.parsers.topology;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.Parser;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Level;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.storm.Config;
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.ref.Reference;
-import java.util.*;
-import java.util.function.Predicate;
-import java.util.function.Supplier;
-
 public class ParserTopologyCLITest {
 
 
@@ -103,11 +106,11 @@ public class ParserTopologyCLITest {
   public void kafkaOffset(boolean longOpt) throws ParseException {
     CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                       .build(longOpt);
     Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
     Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
-    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
   }
   @Test
   public void testCLI_happyPath() throws ParseException {
@@ -127,11 +130,11 @@ public class ParserTopologyCLITest {
   public void happyPath(boolean longOpt) throws ParseException {
     CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                       .build(longOpt);
     Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
     Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
-    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli));
+    Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
   }
 
   @Test
@@ -143,7 +146,7 @@ public class ParserTopologyCLITest {
   public void testConfig_noExtra(boolean longOpt) throws ParseException {
    CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                      .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                     .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                     .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                      .with(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "1")
                                      .with(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "2")
                                      .with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3")
@@ -166,7 +169,7 @@ public class ParserTopologyCLITest {
   public void testOutputTopic(boolean longOpt) throws ParseException {
      CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
                                       .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+                                      .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
                                       .with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
                                       .build(longOpt);
     Assert.assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
@@ -193,7 +196,7 @@ public class ParserTopologyCLITest {
       FileUtils.write(extraFile, extraConfig);
       CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
               .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor")
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
               .with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
               .with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath())
               .build(longOpt);
@@ -208,50 +211,50 @@ public class ParserTopologyCLITest {
   }
 
   private static class ParserInput {
-    private Integer spoutParallelism;
-    private Integer spoutNumTasks;
+    private List<Integer> spoutParallelism;
+    private List<Integer> spoutNumTasks;
     private Integer parserParallelism;
     private Integer parserNumTasks;
     private Integer errorParallelism;
     private Integer errorNumTasks;
-    private Map<String, Object> spoutConfig;
+    private List<Map<String, Object>> spoutConfig;
     private String securityProtocol;
     private Config stormConf;
     private String outputTopic;
     private String errorTopic;
 
-    public ParserInput(ValueSupplier<Integer> spoutParallelism,
-                       ValueSupplier<Integer> spoutNumTasks,
+    public ParserInput(ValueSupplier<List> spoutParallelism,
+                       ValueSupplier<List> spoutNumTasks,
                        ValueSupplier<Integer> parserParallelism,
                        ValueSupplier<Integer> parserNumTasks,
                        ValueSupplier<Integer> errorParallelism,
                        ValueSupplier<Integer> errorNumTasks,
-                       ValueSupplier<Map> spoutConfig,
+                       ValueSupplier<List> spoutConfig,
                        ValueSupplier<String> securityProtocol,
                        ValueSupplier<Config> stormConf,
                        ValueSupplier<String> outputTopic,
                        ValueSupplier<String> errorTopic,
-                       SensorParserConfig config
+                       List<SensorParserConfig> configs
                       )
     {
-      this.spoutParallelism = spoutParallelism.get(config, Integer.class);
-      this.spoutNumTasks = spoutNumTasks.get(config, Integer.class);
-      this.parserParallelism = parserParallelism.get(config, Integer.class);
-      this.parserNumTasks = parserNumTasks.get(config, Integer.class);
-      this.errorParallelism = errorParallelism.get(config, Integer.class);
-      this.errorNumTasks = errorNumTasks.get(config, Integer.class);
-      this.spoutConfig = spoutConfig.get(config, Map.class);
-      this.securityProtocol = securityProtocol.get(config, String.class);
-      this.stormConf = stormConf.get(config, Config.class);
-      this.outputTopic = outputTopic.get(config, String.class);
-      this.errorTopic = outputTopic.get(config, String.class);
+      this.spoutParallelism = spoutParallelism.get(configs, List.class);
+      this.spoutNumTasks = spoutNumTasks.get(configs, List.class);
+      this.parserParallelism = parserParallelism.get(configs, Integer.class);
+      this.parserNumTasks = parserNumTasks.get(configs, Integer.class);
+      this.errorParallelism = errorParallelism.get(configs, Integer.class);
+      this.errorNumTasks = errorNumTasks.get(configs, Integer.class);
+      this.spoutConfig = spoutConfig.get(configs, List.class);
+      this.securityProtocol = securityProtocol.get(configs, String.class);
+      this.stormConf = stormConf.get(configs, Config.class);
+      this.outputTopic = outputTopic.get(configs, String.class);
+      this.errorTopic = errorTopic.get(configs, String.class);
     }
 
-    public Integer getSpoutParallelism() {
+    public List<Integer> getSpoutParallelism() {
       return spoutParallelism;
     }
 
-    public Integer getSpoutNumTasks() {
+    public List<Integer> getSpoutNumTasks() {
       return spoutNumTasks;
     }
 
@@ -271,7 +274,7 @@ public class ParserTopologyCLITest {
       return errorNumTasks;
     }
 
-    public Map<String, Object> getSpoutConfig() {
+    public List<Map<String, Object>> getSpoutConfig() {
       return spoutConfig;
     }
 
@@ -330,43 +333,116 @@ public class ParserTopologyCLITest {
   @Test
   public void testSpoutParallelism() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
-                    , "10"
-                    , input -> input.getSpoutParallelism().equals(10)
-                    , () -> {
-                      SensorParserConfig config = getBaseConfig();
-                      config.setSpoutParallelism(20);
-                      return config;
-                    }
-                    , input -> input.getSpoutParallelism().equals(20)
-                    );
+        , "10"
+        , input -> input.getSpoutParallelism().equals(Collections.singletonList(10))
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSpoutParallelism(20);
+          return Collections.singletonList(config);
+        }
+        , input -> input.getSpoutParallelism().equals(Collections.singletonList(20))
+    );
+  }
+
+  @Test
+  public void testSpoutParallelismMultiple() throws Exception {
+    // Each spout uses it's own
+    // Return one per spout.
+    List<Integer> spoutParCli = new ArrayList<>();
+    spoutParCli.add(10);
+    spoutParCli.add(12);
+    List<Integer> spoutParConfig = new ArrayList<>();
+    spoutParConfig.add(20);
+    spoutParConfig.add(30);
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
+        , "10,12"
+        , input -> input.getSpoutParallelism().equals(spoutParCli)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSpoutParallelism(20);
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSpoutParallelism(30);
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getSpoutParallelism().equals(spoutParConfig)
+    );
   }
 
   @Test
   public void testSpoutNumTasks() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
                     , "10"
-                    , input -> input.getSpoutNumTasks().equals(10)
+                    , input -> input.getSpoutNumTasks().equals(Collections.singletonList(10))
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setSpoutNumTasks(20);
-                      return config;
+                      return Collections.singletonList(config);
                     }
-                    , input -> input.getSpoutNumTasks().equals(20)
+                    , input -> input.getSpoutNumTasks().equals(Collections.singletonList(20))
                     );
   }
 
   @Test
+  public void testSpoutNumTasksMultiple() throws Exception {
+    // Return one per spout.
+    List<Integer> numTasksCli = new ArrayList<>();
+    numTasksCli.add(10);
+    numTasksCli.add(12);
+    List<Integer> numTasksConfig = new ArrayList<>();
+    numTasksConfig.add(20);
+    numTasksConfig.add(30);
+    testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
+        , "10,12"
+        , input -> input.getSpoutNumTasks().equals(numTasksCli)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSpoutNumTasks(20);
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSpoutNumTasks(30);
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getSpoutNumTasks().equals(numTasksConfig)
+    );
+  }
+
+  @Test
   public void testParserParallelism() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
-                    , "10"
-                    , input -> input.getParserParallelism().equals(10)
-                    , () -> {
-                      SensorParserConfig config = getBaseConfig();
-                      config.setParserParallelism(20);
-                      return config;
-                    }
-                    , input -> input.getParserParallelism().equals(20)
-                    );
+        , "10"
+        , input -> input.getParserParallelism().equals(10)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setParserParallelism(20);
+          return Collections.singletonList(config);
+        }
+        , input -> input.getParserParallelism().equals(20)
+    );
+  }
+
+  @Test
+  public void testParserParallelismMultiple() throws Exception {
+    // Last one wins
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
+        , "10"
+        , input -> input.getParserParallelism().equals(10)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setParserParallelism(20);
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setParserParallelism(30);
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getParserParallelism().equals(30)
+    );
   }
 
   @Test
@@ -377,13 +453,32 @@ public class ParserTopologyCLITest {
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setParserNumTasks(20);
-                      return config;
+                      SensorParserConfig config2 = getBaseConfig();
+                      config2.setParserNumTasks(30);
+                      List<SensorParserConfig> configs = new ArrayList<>();
+                      configs.add(config);
+                      configs.add(config2);
+                      return configs;
                     }
-                    , input -> input.getParserNumTasks().equals(20)
+                    , input -> input.getParserNumTasks().equals(30)
                     );
   }
 
   @Test
+  public void testParserNumTasksMultiple() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
+        , "10"
+        , input -> input.getParserNumTasks().equals(10)
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setParserNumTasks(20);
+          return Collections.singletonList(config);
+        }
+        , input -> input.getParserNumTasks().equals(20)
+    );
+  }
+
+  @Test
   public void testErrorParallelism() throws Exception {
     testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM
                     , "10"
@@ -391,7 +486,7 @@ public class ParserTopologyCLITest {
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setErrorWriterParallelism(20);
-                      return config;
+                      return Collections.singletonList(config);
                     }
                     , input -> input.getErrorParallelism().equals(20)
                     );
@@ -405,7 +500,7 @@ public class ParserTopologyCLITest {
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setErrorWriterNumTasks(20);
-                      return config;
+                      return Collections.singletonList(config);
                     }
                     , input -> input.getErrorNumTasks().equals(20)
                     );
@@ -419,13 +514,55 @@ public class ParserTopologyCLITest {
                     , () -> {
                       SensorParserConfig config = getBaseConfig();
                       config.setSecurityProtocol("KERBEROS");
-                      return config;
+                      return Collections.singletonList(config);
                     }
                     , input -> input.getSecurityProtocol().equals("KERBEROS")
                     );
   }
 
   @Test
+  public void testSecurityProtocol_fromCLIMultipleUniform() throws Exception {
+    testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+        , "PLAINTEXT"
+        , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSecurityProtocol("PLAINTEXT");
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSecurityProtocol("PLAINTEXT");
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          return configs;
+        }
+        , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+    );
+  }
+
+  @Test
+  public void testSecurityProtocol_fromCLIMultipleMixed() throws Exception {
+    // Non plaintext wins
+    testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
+        , "PLAINTEXT"
+        , input -> input.getSecurityProtocol().equals("PLAINTEXT")
+        , () -> {
+          SensorParserConfig config = getBaseConfig();
+          config.setSecurityProtocol("PLAINTEXT");
+          SensorParserConfig config2 = getBaseConfig();
+          config2.setSecurityProtocol("KERBEROS");
+          SensorParserConfig config3 = getBaseConfig();
+          config3.setSecurityProtocol("PLAINTEXT");
+          List<SensorParserConfig> configs = new ArrayList<>();
+          configs.add(config);
+          configs.add(config2);
+          configs.add(config3);
+          return configs;
+        }
+        , input -> input.getSecurityProtocol().equals("KERBEROS")
+    );
+  }
+
+  @Test
   public void testSecurityProtocol_fromSpout() throws Exception {
     //Ultimately the order of precedence is CLI > spout config > parser config
     File extraConfig = File.createTempFile("spoutConfig", "json");
@@ -444,7 +581,7 @@ public class ParserTopologyCLITest {
               , () -> {
                 SensorParserConfig config = getBaseConfig();
                 config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
-                return config;
+                return Collections.singletonList(config);
               }
               , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
       );
@@ -458,7 +595,7 @@ public class ParserTopologyCLITest {
               , () -> {
                 SensorParserConfig config = getBaseConfig();
                 config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
-                return config;
+                return Collections.singletonList(config);
               }
               , input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
       );
@@ -481,7 +618,7 @@ public class ParserTopologyCLITest {
                         SensorParserConfig config = getBaseConfig();
                         config.setNumWorkers(100);
                         config.setNumAckers(200);
-                        return config;
+                        return Collections.singletonList(config);
                               }
                       , input -> {
                           Config c = input.getStormConf();
@@ -519,7 +656,7 @@ public class ParserTopologyCLITest {
                             put(Config.TOPOLOGY_ACKER_EXECUTORS, 200);
                           }}
                                              );
-                        return config;
+                        return Collections.singletonList(config);
                               }
                       , input -> {
                           Config c = input.getStormConf();
@@ -542,22 +679,21 @@ public class ParserTopologyCLITest {
                       put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
                     }};
     Predicate<ParserInput> cliOverrideExpected = input -> {
-      return input.getSpoutConfig().get("extra_config").equals("from_file");
+      return input.getSpoutConfig().get(0).get("extra_config").equals("from_file");
     };
 
     Predicate<ParserInput> configOverrideExpected = input -> {
-      return input.getSpoutConfig().get("extra_config").equals("from_zk")
-                                  ;
+      return input.getSpoutConfig().get(0).get("extra_config").equals("from_zk");
     };
 
-    Supplier<SensorParserConfig> configSupplier = () -> {
+    Supplier<List<SensorParserConfig>> configSupplier = () -> {
       SensorParserConfig config = getBaseConfig();
       config.setSpoutConfig(
               new HashMap<String, Object>() {{
                 put("extra_config", "from_zk");
               }}
       );
-      return config;
+      return Collections.singletonList(config);
     };
     testConfigOption( cliOptions
                     , cliOverrideExpected
@@ -573,7 +709,7 @@ public class ParserTopologyCLITest {
   private void testConfigOption( ParserTopologyCLI.ParserOptions option
                                , String cliOverride
                                , Predicate<ParserInput> cliOverrideCondition
-                               , Supplier<SensorParserConfig> configSupplier
+                               , Supplier<List<SensorParserConfig>> configSupplier
                                , Predicate<ParserInput> configOverrideCondition
   ) throws Exception {
     testConfigOption(
@@ -588,48 +724,48 @@ public class ParserTopologyCLITest {
 
   private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options
                                , Predicate<ParserInput> cliOverrideCondition
-                               , Supplier<SensorParserConfig> configSupplier
+                               , Supplier<List<SensorParserConfig>> configSupplier
                                , Predicate<ParserInput> configOverrideCondition
   ) throws Exception {
     //CLI Override
-    SensorParserConfig config = configSupplier.get();
+    List<SensorParserConfig> configs = configSupplier.get();
     {
       CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
               .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
       for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) {
         builder.with(entry.getKey(), entry.getValue());
       }
       CommandLine cmd = builder.build(true);
-      ParserInput input = getInput(cmd, config);
+      ParserInput input = getInput(cmd, configs);
       Assert.assertTrue(cliOverrideCondition.test(input));
     }
     // Config Override
     {
       CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
               .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
-              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor");
+              .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
       CommandLine cmd = builder.build(true);
-      ParserInput input = getInput(cmd, config);
+      ParserInput input = getInput(cmd, configs);
       Assert.assertTrue(configOverrideCondition.test(input));
     }
   }
 
-  private static ParserInput getInput(CommandLine cmd, SensorParserConfig config ) throws Exception {
+  private static ParserInput getInput(CommandLine cmd, List<SensorParserConfig> configs ) throws Exception {
     final ParserInput[] parserInput = new ParserInput[]{null};
     new ParserTopologyCLI() {
       @Override
       protected ParserTopologyBuilder.ParserTopology getParserTopology(
               String zookeeperUrl,
               Optional<String> brokerUrl,
-              String sensorType,
-              ValueSupplier<Integer> spoutParallelism,
-              ValueSupplier<Integer> spoutNumTasks,
+              List<String> sensorType,
+              ValueSupplier<List> spoutParallelism,
+              ValueSupplier<List> spoutNumTasks,
               ValueSupplier<Integer> parserParallelism,
               ValueSupplier<Integer> parserNumTasks,
               ValueSupplier<Integer> errorParallelism,
               ValueSupplier<Integer> errorNumTasks,
-              ValueSupplier<Map> spoutConfig,
+              ValueSupplier<List> spoutConfig,
               ValueSupplier<String> securityProtocol,
               ValueSupplier<Config> stormConf,
               ValueSupplier<String> outputTopic,
@@ -647,7 +783,7 @@ public class ParserTopologyCLITest {
                stormConf,
                outputTopic,
                errorTopic,
-               config
+               configs
        );
 
         return null;

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 49d7521..788df2d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.metron.writers.integration;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -107,7 +108,7 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
             .withParserSensorConfig(sensorType, parserConfig);
 
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
-            .withSensorType(sensorType)
+            .withSensorTypes(Collections.singletonList(sensorType))
             .withTopologyProperties(topologyProperties)
             .withBrokerUrl(kafkaComponent.getBrokerList())
             .withOutputTopic(parserConfig.getOutputTopic())

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index 99506de..cecba3d 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -95,6 +96,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
    *    "sensorTopic": "dummy",
    *    "outputTopic": "output",
    *    "errorTopic": "parser_error",
+   *    "readMetadata": true,
    *    "parserConfig": {
    *        "batchSize" : 1,
    *        "columns" : {
@@ -148,7 +150,12 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }};
 
     final Properties topologyProperties = new Properties();
-    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+    ComponentRunner runner = setupTopologyComponents(
+        topologyProperties,
+        Collections.singletonList(sensorType),
+        Collections.singletonList(parserConfig),
+        globalConfigWithValidation
+    );
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -172,7 +179,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
 
   @Test
   public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception {
-    final String sensorType = "dummy";
+   final String sensorType = "dummy";
     SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("valid,foo"));
@@ -181,7 +188,8 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }};
 
     final Properties topologyProperties = new Properties();
-    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, Collections.singletonList(sensorType),
+        Collections.singletonList(parserConfig), globalConfigWithValidation);
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -223,27 +231,31 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
    *
    * @return runner
    */
-  public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType,
-      SensorParserConfig parserConfig, String globalConfig) {
+  public ComponentRunner setupTopologyComponents(Properties topologyProperties, List<String> sensorTypes,
+      List<SensorParserConfig> parserConfigs, String globalConfig) {
     zkServerComponent = getZKServerComponent(topologyProperties);
-    kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
-      add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
-      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-    }});
+    List<KafkaComponent.Topic> topics = new ArrayList<>();
+    for(String sensorType : sensorTypes) {
+      topics.add(new KafkaComponent.Topic(sensorType, 1));
+    }
+    topics.add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    kafkaComponent = getKafkaComponent(topologyProperties, topics);
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
 
     configUploadComponent = new ConfigUploadComponent()
         .withTopologyProperties(topologyProperties)
-        .withGlobalConfig(globalConfig)
-        .withParserSensorConfig(sensorType, parserConfig);
+        .withGlobalConfig(globalConfig);
+
+    for (int i = 0; i < sensorTypes.size(); ++i) {
+      configUploadComponent.withParserSensorConfig(sensorTypes.get(i), parserConfigs.get(i));
+    }
 
     parserTopologyComponent = new ParserTopologyComponent.Builder()
-        .withSensorType(sensorType)
+        .withSensorTypes(sensorTypes)
         .withTopologyProperties(topologyProperties)
         .withBrokerUrl(kafkaComponent.getBrokerList())
-        .withErrorTopic(parserConfig.getErrorTopic())
-        .withOutputTopic(parserConfig.getOutputTopic())
+        .withErrorTopic(parserConfigs.get(0).getErrorTopic())
+        .withOutputTopic(parserConfigs.get(0).getOutputTopic())
         .build();
 
     return new ComponentRunner.Builder()
@@ -325,8 +337,22 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
   @Multiline
   public static String offsetParserConfigJSON;
 
+  /**
+   * {
+   *    "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$DummyObjectParser",
+   *    "sensorTopic":"dummyobjectparser",
+   *    "outputTopic": "enrichments",
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "batchSize" : 1
+   *    }
+   * }
+   */
+  @Multiline
+  public static String dummyParserConfigJSON;
+
   @Test
-  public void commits_kafka_offsets_for_emtpy_objects() throws Exception {
+  public void commits_kafka_offsets_for_empty_objects() throws Exception {
     final String sensorType = "emptyobjectparser";
     SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
@@ -335,7 +361,11 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
       add(Bytes.toBytes("baz"));
     }};
     final Properties topologyProperties = new Properties();
-    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty);
+    ComponentRunner runner = setupTopologyComponents(
+        topologyProperties,
+        Collections.singletonList(sensorType),
+        Collections.singletonList(parserConfig),
+        globalConfigEmpty);
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
@@ -356,6 +386,64 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+  @Test
+  public void test_multiple_sensors() throws Exception {
+    // Setup first sensor
+    final String emptyObjectSensorType = "emptyobjectparser";
+    SensorParserConfig emptyObjectParserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
+    final List<byte[]> emptyObjectInputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("foo"));
+      add(Bytes.toBytes("bar"));
+      add(Bytes.toBytes("baz"));
+    }};
+
+    // Setup second sensor
+    final String dummySensorType = "dummyobjectparser";
+    SensorParserConfig dummyParserConfig = JSONUtils.INSTANCE.load(dummyParserConfigJSON, SensorParserConfig.class);
+    final List<byte[]> dummyInputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("dummy_foo"));
+      add(Bytes.toBytes("dummy_bar"));
+      add(Bytes.toBytes("dummy_baz"));
+    }};
+
+    final Properties topologyProperties = new Properties();
+
+    List<String> sensorTypes = new ArrayList<>();
+    sensorTypes.add(emptyObjectSensorType);
+    sensorTypes.add(dummySensorType);
+
+    List<SensorParserConfig> parserConfigs = new ArrayList<>();
+    parserConfigs.add(emptyObjectParserConfig);
+    parserConfigs.add(dummyParserConfig);
+
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorTypes, parserConfigs, globalConfigEmpty);
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(emptyObjectSensorType, emptyObjectInputMessages);
+      kafkaComponent.writeMessages(dummySensorType, dummyInputMessages);
+
+      final List<byte[]> allInputMessages = new ArrayList<>();
+      allInputMessages.addAll(emptyObjectInputMessages);
+      allInputMessages.addAll(dummyInputMessages);
+      Processor allResultsProcessor = new AllResultsProcessor(allInputMessages, Constants.ENRICHMENT_TOPIC);
+      @SuppressWarnings("unchecked")
+      ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor);
+
+      // validate the output messages
+      assertThat(
+          "size should match",
+          result.getResult().size(),
+          equalTo(allInputMessages.size()));
+      for (JSONObject record : result.getResult()) {
+        assertThat("record should have a guid", record.containsKey("guid"), equalTo(true));
+      }
+    } finally {
+      if (runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
   /**
    * Goal is to check returning an empty JSONObject in our List returned by parse.
    */
@@ -380,6 +468,34 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+
+  /**
+   * Goal is to check returning an empty JSONObject in our List returned by parse.
+   */
+  public static class DummyObjectParser implements MessageParser<JSONObject>, Serializable {
+
+    @Override
+    public void init() {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public List<JSONObject> parse(byte[] bytes) {
+      JSONObject dummy = new JSONObject();
+      dummy.put("dummy_key", "dummy_value");
+      return ImmutableList.of(dummy);
+    }
+
+    @Override
+    public boolean validate(JSONObject message) {
+      return true;
+    }
+
+    @Override
+    public void configure(Map<String, Object> map) {
+    }
+  }
+
   /**
    * Verifies all messages in the provided List of input messages appears in the specified
    * Kafka output topic

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index e35960f..7678584 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -23,6 +23,7 @@ import static java.lang.String.format;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -118,7 +119,7 @@ public class BulkWriterComponent<MESSAGE_T> {
   public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
     LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e);
     MetronError error = new MetronError()
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR)
             .withThrowable(e);
     tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 0264b3d..c389854 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -17,11 +17,22 @@
  */
 package org.apache.metron.writer;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
@@ -38,19 +49,6 @@ import org.mockito.MockitoAnnotations;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.verifyStatic;
-
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({BulkWriterComponent.class, ErrorUtils.class})
 public class BulkWriterComponentTest {
@@ -130,7 +128,7 @@ public class BulkWriterComponentTest {
   public void writeShouldProperlyHandleWriterErrors() throws Exception {
     Throwable e = new Exception("test exception");
     MetronError error = new MetronError()
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
@@ -164,7 +162,7 @@ public class BulkWriterComponentTest {
   public void writeShouldProperlyHandleWriterException() throws Exception {
     Throwable e = new Exception("test exception");
     MetronError error = new MetronError()
-            .withSensorType(sensorType)
+            .withSensorType(Collections.singleton(sensorType))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
@@ -183,10 +181,10 @@ public class BulkWriterComponentTest {
   public void errorAllShouldClearMapsAndHandleErrors() throws Exception {
     Throwable e = new Exception("test exception");
     MetronError error1 = new MetronError()
-            .withSensorType("sensor1")
+            .withSensorType(Collections.singleton("sensor1"))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
     MetronError error2 = new MetronError()
-            .withSensorType("sensor2")
+            .withSensorType(Collections.singleton("sensor2"))
             .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
 
     BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);

http://git-wip-us.apache.org/repos/asf/metron/blob/f4345383/use-cases/parser_chaining/README.md
----------------------------------------------------------------------
diff --git a/use-cases/parser_chaining/README.md b/use-cases/parser_chaining/README.md
index 26fd333..4055bcd 100644
--- a/use-cases/parser_chaining/README.md
+++ b/use-cases/parser_chaining/README.md
@@ -233,3 +233,17 @@ cat ~/data.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --b
 ```
 
 You should see indices created for the `cisco-5-304` and `cisco-6-302` data with appropriate fields created for each type.
+
+# Aggregated Parsers with Parser Chaining
+Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor.
+
+Instead of creating a topology per sensor, all 3 (`pix-syslog-parser`, `cisco-5-304`, and `cisco-6-302`) can be run in a single aggregated parser. It's also possible to aggregate a subset of these parsers (e.g. run `cisco-6-302` as it's own topology, and aggregate the other 2).
+
+The step to start parsers then becomes
+```
+$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s cisco-6-302,cisco-5-304,pix_syslog_router
+```
+
+The flow through the Storm topology and Kafka topics:
+
+![Aggregated Flow](aggregated_parser_chaining_flow.svg)
\ No newline at end of file