You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2016/12/16 16:47:32 UTC

incubator-metron git commit: METRON-587 Integration tests should use common processor implementations where possible (ottobackwards) closes apache/incubator-metron#374

Repository: incubator-metron
Updated Branches:
  refs/heads/master ffae676cb -> ec41e7321


METRON-587 Integration tests should use common processor implementations where possible (ottobackwards) closes apache/incubator-metron#374


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ec41e732
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ec41e732
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ec41e732

Branch: refs/heads/master
Commit: ec41e732170ebf80c00a8a9013a390f7ffa48899
Parents: ffae676
Author: ottobackwards <ot...@gmail.com>
Authored: Fri Dec 16 09:56:05 2016 -0500
Committer: Otto Fowler <ot...@apache.org>
Committed: Fri Dec 16 09:56:05 2016 -0500

----------------------------------------------------------------------
 .../integration/EnrichmentIntegrationTest.java  |  99 ++++++++++--------
 .../src/test/resources/log4j.properties         |   5 +-
 .../metron/integration/BaseIntegrationTest.java |   1 -
 .../integration/processors/KafkaMessageSet.java |  44 ++++++++
 .../integration/processors/KafkaProcessor.java  | 101 +++++++++++++++++++
 .../integration/ParserIntegrationTest.java      |  64 ++++++------
 .../integration/WriterBoltIntegrationTest.java  |  59 ++++++-----
 7 files changed, 270 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index b7d6350..f399c81 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -18,10 +18,7 @@
 package org.apache.metron.enrichment.integration;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Splitter;
+import com.google.common.base.*;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -38,10 +35,15 @@ import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
 import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
 import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
 import org.apache.metron.integration.*;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaProcessor;
 import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.test.mock.MockHTable;
 import org.junit.Assert;
@@ -51,7 +53,13 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 
 public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   private static final String SRC_IP = "ip_src_addr";
@@ -63,7 +71,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   }};
   protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
-
+  private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "TestIndexed";
+  private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
   public static class Provider implements TableProvider, Serializable {
     MockHTable.Provider  provider = new MockHTable.Provider();
     @Override
@@ -72,9 +81,15 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+  private static List<byte[]> getInputMessages(String path){
+    try{
+      return TestUtils.readSampleData(path);
+    }catch(IOException ioe){
+      return null;
+    }
+  }
   @Test
   public void test() throws Exception {
-    final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
     final String cf = "cf";
     final String trackerHBaseTableName = "tracker";
     final String threatIntelTableName = "threat_intel";
@@ -153,7 +168,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       fluxComponent.submitTopology();
 
       kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
-      ProcessorResult<List<Map<String, Object>>> result = runner.process(getProcessor(inputMessages));
+      ProcessorResult<List<Map<String, Object>>> result = runner.process(getProcessor());
       // We expect failures, so we don't care if result returned failure or not
       List<Map<String, Object>> docs = result.getResult();
       Assert.assertEquals(inputMessages.size(), docs.size());
@@ -434,39 +449,39 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     return ret;
   }
 
-  public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages) {
-    return new Processor<List<Map<String, Object>>>() {
-      List<Map<String, Object>> docs = null;
-      List<byte[]> errors = null;
-      List<byte[]> invalids = null;
-
-      @Override
-      public ReadinessState process(ComponentRunner runner) {
-        KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class);
-        List<byte[]> messages = kafkaComponent.readMessages(Constants.INDEXING_TOPIC);
-        errors = kafkaComponent.readMessages(Constants.ENRICHMENT_ERROR_TOPIC);
-        if (messages.size() == inputMessages.size()) {
-          docs = new ArrayList<>();
-          for(byte[] message : messages) {
-            try {
-              docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() {}));
-            } catch (IOException e) {
-              throw new IllegalStateException(e.getMessage(), e);
-            }
-          }
-          return ReadinessState.READY;
-        } else {
-          return ReadinessState.NOT_READY;
-        }
-      }
-
-      @Override
-      public ProcessorResult<List<Map<String, Object>>> getResult()
-      {
-        ProcessorResult.Builder<List<Map<String,Object>>> builder = new ProcessorResult.Builder();
-        return builder.withResult(docs).withProcessErrors(errors).withProcessInvalids(invalids).build();
-      }
-    };
+  @SuppressWarnings("unchecked")
+  private Processor<List<Map<String, Object>>> getProcessor() {
+
+    KafkaProcessor<List<Map<String, Object>>> kafkaProcessor = new KafkaProcessor<>().withKafkaComponentName("kafka")
+            .withReadTopic(Constants.INDEXING_TOPIC)
+            .withErrorTopic(Constants.ENRICHMENT_ERROR_TOPIC)
+            .withInvalidTopic(Constants.INVALID_STREAM)
+            .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+              @Nullable
+              @Override
+              public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+                // this test is written to return 10 errors and 10 messages
+                // we can just check when the messages match here
+                // if they do then we are good
+                return messageSet.getMessages().size() == inputMessages.size();
+              }
+            })
+            .withProvideResult(new Function<KafkaMessageSet , List<Map<String, Object>>>() {
+              @Nullable
+              @Override
+              public List<Map<String, Object>> apply(@Nullable KafkaMessageSet messageSet) {
+                List<Map<String,Object>> docs = new ArrayList<>();
+                for (byte[] message : messageSet.getMessages()) {
+                  try {
+                    docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() {
+                    }));
+                  } catch (IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                  }
+                }
+                return docs;
+              }
+            });
+    return kafkaProcessor;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-enrichment/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/resources/log4j.properties b/metron-platform/metron-enrichment/src/test/resources/log4j.properties
index ae15e0a..a738cbf 100644
--- a/metron-platform/metron-enrichment/src/test/resources/log4j.properties
+++ b/metron-platform/metron-enrichment/src/test/resources/log4j.properties
@@ -16,4 +16,7 @@ log4j.rootLogger=error,stdout
 log4j.threshhold=ERROR
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
\ No newline at end of file
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.stdout.filter.1.StringToMatch=Error Test Raw Message String
+log4j.appender.stdout.filter.1.AcceptOnMatch=false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
index d7a486b..7207d7a 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -42,5 +42,4 @@ public abstract class BaseIntegrationTest {
                     }
                 });
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
new file mode 100644
index 0000000..4227933
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.integration.processors;
+
+import java.util.List;
+
+public class KafkaMessageSet{
+    public List<byte[]> messages;
+    public List<byte[]> errors;
+    public List<byte[]> invalids;
+
+    public KafkaMessageSet(List<byte[]> messages, List<byte[]> errors, List<byte[]> invalids) {
+        this.messages = messages;
+        this.errors = errors;
+        this.invalids = invalids;
+    }
+
+
+    public List<byte[]> getMessages() {
+        return messages;
+    }
+    public List<byte[]> getErrors() {
+        return errors;
+    }
+    public List<byte[]> getInvalids() {
+        return invalids;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
new file mode 100644
index 0000000..6fdbbf4
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.processors;
+
+import com.google.common.base.Function;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.KafkaComponent;
+
+import java.util.LinkedList;
+import java.util.List;
+public class KafkaProcessor<T> implements Processor<T> {
+    private String kafkaComponentName;
+    private String readTopic;
+    private String errorTopic;
+    private String invalidTopic;
+    private List<byte[]> messages = new LinkedList<>();
+    private List<byte[]> errors = new LinkedList<>();
+    private List<byte[]> invalids = new LinkedList<>();
+
+    public KafkaProcessor(){}
+    public KafkaProcessor withKafkaComponentName(String name){
+        this.kafkaComponentName = name;
+        return this;
+    }
+    public KafkaProcessor withReadTopic(String topicName){
+        this.readTopic = topicName;
+        return this;
+    }
+    public KafkaProcessor withErrorTopic(String topicName){
+        this.errorTopic = topicName;
+        return this;
+    }
+    public KafkaProcessor withInvalidTopic(String topicName){
+        this.invalidTopic = topicName;
+        return this;
+    }
+    public KafkaProcessor withValidateReadMessages(Function<KafkaMessageSet, Boolean> validate){
+        this.validateReadMessages = validate;
+        return this;
+    }
+    public KafkaProcessor withProvideResult(Function<KafkaMessageSet, T> provide){
+        this.provideResult = provide;
+        return this;
+    }
+
+    private Function<KafkaMessageSet, Boolean> validateReadMessages;
+    private Function<KafkaMessageSet,T> provideResult;
+
+    public ReadinessState process(ComponentRunner runner){
+        KafkaComponent kafkaComponent = runner.getComponent(kafkaComponentName, KafkaComponent.class);
+        LinkedList<byte[]> outputMessages = new LinkedList<>(kafkaComponent.readMessages(readTopic));
+        LinkedList<byte[]> outputErrors = null;
+        LinkedList<byte[]> outputInvalids = null;
+
+        if (errorTopic != null) {
+            outputErrors = new LinkedList<>(kafkaComponent.readMessages(errorTopic));
+        }
+        if (invalidTopic != null) {
+            outputInvalids = new LinkedList<>(kafkaComponent.readMessages(invalidTopic));
+        }
+        Boolean validated = validateReadMessages.apply(new KafkaMessageSet(outputMessages,outputErrors,outputInvalids));
+        if(validated == null){
+            validated = false;
+        }
+        if(validated){
+            messages.addAll(outputMessages);
+            errors.addAll(outputErrors);
+            invalids.addAll(outputInvalids);
+            outputMessages.clear();
+            outputErrors.clear();
+            outputInvalids.clear();
+            return ReadinessState.READY;
+        }
+        return ReadinessState.NOT_READY;
+    }
+    @SuppressWarnings("unchecked")
+    public ProcessorResult<T> getResult(){
+        ProcessorResult.Builder<T> builder = new ProcessorResult.Builder();
+        return builder.withResult(provideResult.apply(new KafkaMessageSet(messages,errors,invalids))).withProcessErrors(errors).withProcessInvalids(invalids).build();
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index 55dec1c..4ba1c43 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -17,32 +17,38 @@
  */
 package org.apache.metron.parsers.integration;
 
+import com.google.common.base.Function;
 import junit.framework.Assert;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.*;
 import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaProcessor;
 import org.apache.metron.integration.utils.TestUtils;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
 import org.apache.metron.test.TestDataType;
 import org.apache.metron.test.utils.SampleDataUtils;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.util.*;
 
 public abstract class ParserIntegrationTest extends BaseIntegrationTest {
-
+  protected List<byte[]> inputMessages;
   @Test
   public void test() throws Exception {
     final String sensorType = getSensorType();
-    final List<byte[]> inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
+    inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
 
     final Properties topologyProperties = new Properties();
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(sensorType, 1));
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+      add(new KafkaComponent.Topic(Constants.INVALID_STREAM,1));
+      add(new KafkaComponent.Topic(Constants.ERROR_STREAM,1));
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
 
@@ -71,36 +77,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
     runner.start();
     try {
       kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<List<byte[]>> result =
-              runner.process(new Processor<List<byte[]>>() {
-                List<byte[]> messages = null;
-                List<byte[]> errors = null;
-                List<byte[]> invalids = null;
-
-                @Override
-                public ReadinessState process(ComponentRunner runner) {
-                  KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class);
-                  List<byte[]> outputMessages = kafkaComponent.readMessages(Constants.ENRICHMENT_TOPIC);
-                  if (outputMessages.size() == inputMessages.size()) {
-                    messages = outputMessages;
-                    return ReadinessState.READY;
-                  } else {
-                    errors = kafkaComponent.readMessages(Constants.ERROR_STREAM);
-                    invalids = kafkaComponent.readMessages(Constants.INVALID_STREAM);
-                    if(errors.size() > 0 || invalids.size() > 0) {
-                      messages = outputMessages;
-                      return ReadinessState.READY;
-                    }
-                    return ReadinessState.NOT_READY;
-                  }
-                }
-
-                @Override
-                public ProcessorResult<List<byte[]>> getResult() {
-                  ProcessorResult.Builder<List<byte[]>> builder = new ProcessorResult.Builder();
-                  return builder.withResult(messages).withProcessErrors(errors).withProcessInvalids(invalids).build();
-                }
-              });
+      ProcessorResult<List<byte[]>> result = runner.process(getProcessor());
       List<byte[]> outputMessages = result.getResult();
       StringBuffer buffer = new StringBuffer();
       if (result.failed()){
@@ -132,6 +109,29 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  private KafkaProcessor<List<byte[]>> getProcessor(){
+
+    return new KafkaProcessor<>()
+            .withKafkaComponentName("kafka")
+            .withReadTopic(Constants.ENRICHMENT_TOPIC)
+            .withErrorTopic(Constants.ERROR_STREAM)
+            .withInvalidTopic(Constants.INVALID_STREAM)
+            .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+              @Nullable
+              @Override
+              public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+                return (messageSet.getMessages().size() + messageSet.getErrors().size() + messageSet.getInvalids().size()) == inputMessages.size();
+              }
+            })
+            .withProvideResult(new Function<KafkaMessageSet,List<byte[]>>(){
+              @Nullable
+              @Override
+              public List<byte[]> apply(@Nullable KafkaMessageSet messageSet) {
+                  return messageSet.getMessages();
+              }
+            });
+  }
   abstract String getSensorType();
   abstract List<ParserValidation> getValidations();
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index f8988cb..f37b1fc 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.writers.integration;
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -29,7 +30,9 @@ import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.integration.*;
 import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaProcessor;
 import org.apache.metron.parsers.csv.CSVParser;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
 import org.apache.metron.test.utils.UnitTestHelper;
@@ -37,6 +40,7 @@ import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.*;
 
@@ -125,33 +129,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<Map<String, List<JSONObject>>> result =
-              runner.process(new Processor<Map<String, List<JSONObject>>>() {
-                Map<String, List<JSONObject>> messages = null;
-
-                @Override
-                public ReadinessState process(ComponentRunner runner) {
-                  KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class);
-                  List<byte[]> outputMessages = kafkaComponent.readMessages(Constants.ENRICHMENT_TOPIC);
-                  List<byte[]> invalid = kafkaComponent.readMessages(Constants.DEFAULT_PARSER_INVALID_TOPIC);
-                  List<byte[]> error = kafkaComponent.readMessages(Constants.DEFAULT_PARSER_ERROR_TOPIC);
-                  if(outputMessages.size() == 1 && invalid.size() == 1 && error.size() == 1) {
-                    messages = new HashMap<String, List<JSONObject>>() {{
-                      put(Constants.ENRICHMENT_TOPIC, loadMessages(outputMessages));
-                      put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(error));
-                      put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(invalid));
-                    }};
-                    return ReadinessState.READY;
-                  }
-                  return ReadinessState.NOT_READY;
-                }
-
-                @Override
-                public ProcessorResult<Map<String, List<JSONObject>>> getResult() {
-                  ProcessorResult.Builder<Map<String,List<JSONObject>>> builder = new ProcessorResult.Builder();
-                  return builder.withResult(messages).build();
-                }
-              });
+      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor());
       Map<String,List<JSONObject>> outputMessages = result.getResult();
       Assert.assertEquals(3, outputMessages.size());
       Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
@@ -198,4 +176,31 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     );
     return tmp;
   }
+  @SuppressWarnings("unchecked")
+  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(){
+
+    return new KafkaProcessor<>()
+            .withKafkaComponentName("kafka")
+            .withReadTopic(Constants.ENRICHMENT_TOPIC)
+            .withErrorTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC)
+            .withInvalidTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC)
+            .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+              @Nullable
+              @Override
+              public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+                return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 1) && (messageSet.getInvalids().size() ==1);
+              }
+            })
+            .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){
+              @Nullable
+              @Override
+              public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
+                return new HashMap<String, List<JSONObject>>() {{
+                  put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
+                  put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(messageSet.getErrors()));
+                  put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(messageSet.getInvalids()));
+                }};
+              }
+            });
+    }
 }