You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2016/12/16 16:47:32 UTC
incubator-metron git commit: METRON-587 Integration tests should use
common processor implementations where possible (ottobackwards) closes
apache/incubator-metron#374
Repository: incubator-metron
Updated Branches:
refs/heads/master ffae676cb -> ec41e7321
METRON-587 Integration tests should use common processor implementations where possible (ottobackwards) closes apache/incubator-metron#374
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ec41e732
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ec41e732
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ec41e732
Branch: refs/heads/master
Commit: ec41e732170ebf80c00a8a9013a390f7ffa48899
Parents: ffae676
Author: ottobackwards <ot...@gmail.com>
Authored: Fri Dec 16 09:56:05 2016 -0500
Committer: Otto Fowler <ot...@apache.org>
Committed: Fri Dec 16 09:56:05 2016 -0500
----------------------------------------------------------------------
.../integration/EnrichmentIntegrationTest.java | 99 ++++++++++--------
.../src/test/resources/log4j.properties | 5 +-
.../metron/integration/BaseIntegrationTest.java | 1 -
.../integration/processors/KafkaMessageSet.java | 44 ++++++++
.../integration/processors/KafkaProcessor.java | 101 +++++++++++++++++++
.../integration/ParserIntegrationTest.java | 64 ++++++------
.../integration/WriterBoltIntegrationTest.java | 59 ++++++-----
7 files changed, 270 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index b7d6350..f399c81 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -18,10 +18,7 @@
package org.apache.metron.enrichment.integration;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Splitter;
+import com.google.common.base.*;
import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
@@ -38,10 +35,15 @@ import org.apache.metron.enrichment.lookup.LookupKV;
import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.enrichment.converter.EnrichmentKey;
+import org.apache.metron.enrichment.converter.EnrichmentValue;
+import org.apache.metron.enrichment.converter.EnrichmentHelper;
import org.apache.metron.integration.*;
import org.apache.metron.integration.components.FluxTopologyComponent;
import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaProcessor;
import org.apache.metron.integration.utils.TestUtils;
import org.apache.metron.test.mock.MockHTable;
import org.junit.Assert;
@@ -51,7 +53,13 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
public class EnrichmentIntegrationTest extends BaseIntegrationTest {
private static final String SRC_IP = "ip_src_addr";
@@ -63,7 +71,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
}};
protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml";
protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
-
+ private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "TestIndexed";
+ private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
public static class Provider implements TableProvider, Serializable {
MockHTable.Provider provider = new MockHTable.Provider();
@Override
@@ -72,9 +81,15 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
}
}
+ private static List<byte[]> getInputMessages(String path){
+ try{
+ return TestUtils.readSampleData(path);
+ }catch(IOException ioe){
+ return null;
+ }
+ }
@Test
public void test() throws Exception {
- final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
final String cf = "cf";
final String trackerHBaseTableName = "tracker";
final String threatIntelTableName = "threat_intel";
@@ -153,7 +168,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
fluxComponent.submitTopology();
kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
- ProcessorResult<List<Map<String, Object>>> result = runner.process(getProcessor(inputMessages));
+ ProcessorResult<List<Map<String, Object>>> result = runner.process(getProcessor());
// We expect failures, so we don't care if result returned failure or not
List<Map<String, Object>> docs = result.getResult();
Assert.assertEquals(inputMessages.size(), docs.size());
@@ -434,39 +449,39 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
return ret;
}
- public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages) {
- return new Processor<List<Map<String, Object>>>() {
- List<Map<String, Object>> docs = null;
- List<byte[]> errors = null;
- List<byte[]> invalids = null;
-
- @Override
- public ReadinessState process(ComponentRunner runner) {
- KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class);
- List<byte[]> messages = kafkaComponent.readMessages(Constants.INDEXING_TOPIC);
- errors = kafkaComponent.readMessages(Constants.ENRICHMENT_ERROR_TOPIC);
- if (messages.size() == inputMessages.size()) {
- docs = new ArrayList<>();
- for(byte[] message : messages) {
- try {
- docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() {}));
- } catch (IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- return ReadinessState.READY;
- } else {
- return ReadinessState.NOT_READY;
- }
- }
-
- @Override
- public ProcessorResult<List<Map<String, Object>>> getResult()
- {
- ProcessorResult.Builder<List<Map<String,Object>>> builder = new ProcessorResult.Builder();
- return builder.withResult(docs).withProcessErrors(errors).withProcessInvalids(invalids).build();
- }
- };
+ @SuppressWarnings("unchecked")
+ private Processor<List<Map<String, Object>>> getProcessor() {
+
+ KafkaProcessor<List<Map<String, Object>>> kafkaProcessor = new KafkaProcessor<>().withKafkaComponentName("kafka")
+ .withReadTopic(Constants.INDEXING_TOPIC)
+ .withErrorTopic(Constants.ENRICHMENT_ERROR_TOPIC)
+ .withInvalidTopic(Constants.INVALID_STREAM)
+ .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+ // this test is written to return 10 errors and 10 messages
+ // we can just check when the messages match here
+ // if they do then we are good
+ return messageSet.getMessages().size() == inputMessages.size();
+ }
+ })
+ .withProvideResult(new Function<KafkaMessageSet , List<Map<String, Object>>>() {
+ @Nullable
+ @Override
+ public List<Map<String, Object>> apply(@Nullable KafkaMessageSet messageSet) {
+ List<Map<String,Object>> docs = new ArrayList<>();
+ for (byte[] message : messageSet.getMessages()) {
+ try {
+ docs.add(JSONUtils.INSTANCE.load(new String(message), new TypeReference<Map<String, Object>>() {
+ }));
+ } catch (IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ return docs;
+ }
+ });
+ return kafkaProcessor;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-enrichment/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/resources/log4j.properties b/metron-platform/metron-enrichment/src/test/resources/log4j.properties
index ae15e0a..a738cbf 100644
--- a/metron-platform/metron-enrichment/src/test/resources/log4j.properties
+++ b/metron-platform/metron-enrichment/src/test/resources/log4j.properties
@@ -16,4 +16,7 @@ log4j.rootLogger=error,stdout
log4j.threshhold=ERROR
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
\ No newline at end of file
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter
+log4j.appender.stdout.filter.1.StringToMatch=Error Test Raw Message String
+log4j.appender.stdout.filter.1.AcceptOnMatch=false
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
index d7a486b..7207d7a 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/BaseIntegrationTest.java
@@ -42,5 +42,4 @@ public abstract class BaseIntegrationTest {
}
});
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
new file mode 100644
index 0000000..4227933
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaMessageSet.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.integration.processors;
+
+import java.util.List;
+
+public class KafkaMessageSet{
+ public List<byte[]> messages;
+ public List<byte[]> errors;
+ public List<byte[]> invalids;
+
+ public KafkaMessageSet(List<byte[]> messages, List<byte[]> errors, List<byte[]> invalids) {
+ this.messages = messages;
+ this.errors = errors;
+ this.invalids = invalids;
+ }
+
+
+ public List<byte[]> getMessages() {
+ return messages;
+ }
+ public List<byte[]> getErrors() {
+ return errors;
+ }
+ public List<byte[]> getInvalids() {
+ return invalids;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
new file mode 100644
index 0000000..6fdbbf4
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/processors/KafkaProcessor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.integration.processors;
+
+import com.google.common.base.Function;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.KafkaComponent;
+
+import java.util.LinkedList;
+import java.util.List;
+public class KafkaProcessor<T> implements Processor<T> {
+ private String kafkaComponentName;
+ private String readTopic;
+ private String errorTopic;
+ private String invalidTopic;
+ private List<byte[]> messages = new LinkedList<>();
+ private List<byte[]> errors = new LinkedList<>();
+ private List<byte[]> invalids = new LinkedList<>();
+
+ public KafkaProcessor(){}
+ public KafkaProcessor withKafkaComponentName(String name){
+ this.kafkaComponentName = name;
+ return this;
+ }
+ public KafkaProcessor withReadTopic(String topicName){
+ this.readTopic = topicName;
+ return this;
+ }
+ public KafkaProcessor withErrorTopic(String topicName){
+ this.errorTopic = topicName;
+ return this;
+ }
+ public KafkaProcessor withInvalidTopic(String topicName){
+ this.invalidTopic = topicName;
+ return this;
+ }
+ public KafkaProcessor withValidateReadMessages(Function<KafkaMessageSet, Boolean> validate){
+ this.validateReadMessages = validate;
+ return this;
+ }
+ public KafkaProcessor withProvideResult(Function<KafkaMessageSet, T> provide){
+ this.provideResult = provide;
+ return this;
+ }
+
+ private Function<KafkaMessageSet, Boolean> validateReadMessages;
+ private Function<KafkaMessageSet,T> provideResult;
+
+ public ReadinessState process(ComponentRunner runner){
+ KafkaComponent kafkaComponent = runner.getComponent(kafkaComponentName, KafkaComponent.class);
+ LinkedList<byte[]> outputMessages = new LinkedList<>(kafkaComponent.readMessages(readTopic));
+ LinkedList<byte[]> outputErrors = null;
+ LinkedList<byte[]> outputInvalids = null;
+
+ if (errorTopic != null) {
+ outputErrors = new LinkedList<>(kafkaComponent.readMessages(errorTopic));
+ }
+ if (invalidTopic != null) {
+ outputInvalids = new LinkedList<>(kafkaComponent.readMessages(invalidTopic));
+ }
+ Boolean validated = validateReadMessages.apply(new KafkaMessageSet(outputMessages,outputErrors,outputInvalids));
+ if(validated == null){
+ validated = false;
+ }
+ if(validated){
+ messages.addAll(outputMessages);
+ errors.addAll(outputErrors);
+ invalids.addAll(outputInvalids);
+ outputMessages.clear();
+ outputErrors.clear();
+ outputInvalids.clear();
+ return ReadinessState.READY;
+ }
+ return ReadinessState.NOT_READY;
+ }
+ @SuppressWarnings("unchecked")
+ public ProcessorResult<T> getResult(){
+ ProcessorResult.Builder<T> builder = new ProcessorResult.Builder();
+ return builder.withResult(provideResult.apply(new KafkaMessageSet(messages,errors,invalids))).withProcessErrors(errors).withProcessInvalids(invalids).build();
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
index 55dec1c..4ba1c43 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/ParserIntegrationTest.java
@@ -17,32 +17,38 @@
*/
package org.apache.metron.parsers.integration;
+import com.google.common.base.Function;
import junit.framework.Assert;
import org.apache.metron.TestConstants;
import org.apache.metron.common.Constants;
import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.*;
import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaProcessor;
import org.apache.metron.integration.utils.TestUtils;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
import org.apache.metron.test.TestDataType;
import org.apache.metron.test.utils.SampleDataUtils;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.util.*;
public abstract class ParserIntegrationTest extends BaseIntegrationTest {
-
+ protected List<byte[]> inputMessages;
@Test
public void test() throws Exception {
final String sensorType = getSensorType();
- final List<byte[]> inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
+ inputMessages = TestUtils.readSampleData(SampleDataUtils.getSampleDataPath(sensorType, TestDataType.RAW));
final Properties topologyProperties = new Properties();
final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
add(new KafkaComponent.Topic(sensorType, 1));
add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+ add(new KafkaComponent.Topic(Constants.INVALID_STREAM,1));
+ add(new KafkaComponent.Topic(Constants.ERROR_STREAM,1));
}});
topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
@@ -71,36 +77,7 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
runner.start();
try {
kafkaComponent.writeMessages(sensorType, inputMessages);
- ProcessorResult<List<byte[]>> result =
- runner.process(new Processor<List<byte[]>>() {
- List<byte[]> messages = null;
- List<byte[]> errors = null;
- List<byte[]> invalids = null;
-
- @Override
- public ReadinessState process(ComponentRunner runner) {
- KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class);
- List<byte[]> outputMessages = kafkaComponent.readMessages(Constants.ENRICHMENT_TOPIC);
- if (outputMessages.size() == inputMessages.size()) {
- messages = outputMessages;
- return ReadinessState.READY;
- } else {
- errors = kafkaComponent.readMessages(Constants.ERROR_STREAM);
- invalids = kafkaComponent.readMessages(Constants.INVALID_STREAM);
- if(errors.size() > 0 || invalids.size() > 0) {
- messages = outputMessages;
- return ReadinessState.READY;
- }
- return ReadinessState.NOT_READY;
- }
- }
-
- @Override
- public ProcessorResult<List<byte[]>> getResult() {
- ProcessorResult.Builder<List<byte[]>> builder = new ProcessorResult.Builder();
- return builder.withResult(messages).withProcessErrors(errors).withProcessInvalids(invalids).build();
- }
- });
+ ProcessorResult<List<byte[]>> result = runner.process(getProcessor());
List<byte[]> outputMessages = result.getResult();
StringBuffer buffer = new StringBuffer();
if (result.failed()){
@@ -132,6 +109,29 @@ public abstract class ParserIntegrationTest extends BaseIntegrationTest {
}
}
+ @SuppressWarnings("unchecked")
+ private KafkaProcessor<List<byte[]>> getProcessor(){
+
+ return new KafkaProcessor<>()
+ .withKafkaComponentName("kafka")
+ .withReadTopic(Constants.ENRICHMENT_TOPIC)
+ .withErrorTopic(Constants.ERROR_STREAM)
+ .withInvalidTopic(Constants.INVALID_STREAM)
+ .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+ return (messageSet.getMessages().size() + messageSet.getErrors().size() + messageSet.getInvalids().size()) == inputMessages.size();
+ }
+ })
+ .withProvideResult(new Function<KafkaMessageSet,List<byte[]>>(){
+ @Nullable
+ @Override
+ public List<byte[]> apply(@Nullable KafkaMessageSet messageSet) {
+ return messageSet.getMessages();
+ }
+ });
+ }
abstract String getSensorType();
abstract List<ParserValidation> getValidations();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ec41e732/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index f8988cb..f37b1fc 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -18,6 +18,7 @@
package org.apache.metron.writers.integration;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.hadoop.hbase.util.Bytes;
@@ -29,7 +30,9 @@ import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
import org.apache.metron.integration.*;
import org.apache.metron.integration.components.KafkaComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaProcessor;
import org.apache.metron.parsers.csv.CSVParser;
import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
import org.apache.metron.test.utils.UnitTestHelper;
@@ -37,6 +40,7 @@ import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.*;
@@ -125,33 +129,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
try {
runner.start();
kafkaComponent.writeMessages(sensorType, inputMessages);
- ProcessorResult<Map<String, List<JSONObject>>> result =
- runner.process(new Processor<Map<String, List<JSONObject>>>() {
- Map<String, List<JSONObject>> messages = null;
-
- @Override
- public ReadinessState process(ComponentRunner runner) {
- KafkaComponent kafkaComponent = runner.getComponent("kafka", KafkaComponent.class);
- List<byte[]> outputMessages = kafkaComponent.readMessages(Constants.ENRICHMENT_TOPIC);
- List<byte[]> invalid = kafkaComponent.readMessages(Constants.DEFAULT_PARSER_INVALID_TOPIC);
- List<byte[]> error = kafkaComponent.readMessages(Constants.DEFAULT_PARSER_ERROR_TOPIC);
- if(outputMessages.size() == 1 && invalid.size() == 1 && error.size() == 1) {
- messages = new HashMap<String, List<JSONObject>>() {{
- put(Constants.ENRICHMENT_TOPIC, loadMessages(outputMessages));
- put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(error));
- put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(invalid));
- }};
- return ReadinessState.READY;
- }
- return ReadinessState.NOT_READY;
- }
-
- @Override
- public ProcessorResult<Map<String, List<JSONObject>>> getResult() {
- ProcessorResult.Builder<Map<String,List<JSONObject>>> builder = new ProcessorResult.Builder();
- return builder.withResult(messages).build();
- }
- });
+ ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor());
Map<String,List<JSONObject>> outputMessages = result.getResult();
Assert.assertEquals(3, outputMessages.size());
Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
@@ -198,4 +176,31 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
);
return tmp;
}
+ @SuppressWarnings("unchecked")
+ private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(){
+
+ return new KafkaProcessor<>()
+ .withKafkaComponentName("kafka")
+ .withReadTopic(Constants.ENRICHMENT_TOPIC)
+ .withErrorTopic(Constants.DEFAULT_PARSER_ERROR_TOPIC)
+ .withInvalidTopic(Constants.DEFAULT_PARSER_INVALID_TOPIC)
+ .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+ return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 1) && (messageSet.getInvalids().size() ==1);
+ }
+ })
+ .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){
+ @Nullable
+ @Override
+ public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
+ return new HashMap<String, List<JSONObject>>() {{
+ put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
+ put(Constants.DEFAULT_PARSER_ERROR_TOPIC, loadMessages(messageSet.getErrors()));
+ put(Constants.DEFAULT_PARSER_INVALID_TOPIC, loadMessages(messageSet.getInvalids()));
+ }};
+ }
+ });
+ }
}