You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:36 UTC
[47/51] [abbrv] metron git commit: METRON-1829 Large Error Message
Causes Slow Search Performance (merrimanr) closes apache/metron#1239
METRON-1829 Large Error Message Causes Slow Search Performance (merrimanr) closes apache/metron#1239
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d44a3925
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d44a3925
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d44a3925
Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: d44a392567e35022bfb35a787b55aff3447ac60e
Parents: 2531c3e
Author: merrimanr <me...@gmail.com>
Authored: Mon Oct 22 08:43:10 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Mon Oct 22 08:43:10 2018 -0500
----------------------------------------------------------------------
.../bolt/BulkMessageWriterBoltTest.java | 2 +-
metron-platform/metron-writer/pom.xml | 6 ++
.../metron/writer/BulkWriterComponent.java | 33 ++++++-----
.../writer/bolt/BulkMessageWriterBolt.java | 2 +-
.../metron/writer/BulkWriterComponentTest.java | 61 +++++++++++++++-----
5 files changed, 76 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 588fc58..083628c 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -207,7 +207,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
}
UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
verify(outputCollector, times(5)).ack(tuple);
- verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+ verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
verify(outputCollector, times(1)).reportError(any(Throwable.class));
}
http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
index e845516..6d08093 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -207,6 +207,12 @@
<artifactId>metron-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-test-utilities</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 7678584..68585c5 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -40,6 +40,7 @@ import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,12 +119,18 @@ public class BulkWriterComponent<MESSAGE_T> {
public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e);
- MetronError error = new MetronError()
- .withSensorType(Collections.singleton(sensorType))
- .withErrorType(Constants.ErrorType.INDEXING_ERROR)
- .withThrowable(e);
- tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
- handleError(tuples, error);
+ tuples.forEach(t -> {
+ MetronError error = new MetronError()
+ .withSensorType(Collections.singleton(sensorType))
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+ .withThrowable(e)
+ .addRawMessage(messageGetStrategy.get(t));
+ collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+ collector.ack(t);
+ });
+ // there is only one error to report for all of the failed tuples
+ collector.reportError(e);
+
}
/**
@@ -133,24 +140,24 @@ public class BulkWriterComponent<MESSAGE_T> {
* <p>Without a valid message, the JSON message cannot be added to the error.
*
* @param e The exception that occurred.
- * @param tuples The tuples to error that may not contain valid messages.
+ * @param tuple The tuple to error that may not contain a valid message.
*/
- public void error(Throwable e, Iterable<Tuple> tuples) {
- LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e);
+ public void error(Throwable e, Tuple tuple) {
+ LOG.error("Failing tuple", e);
MetronError error = new MetronError()
.withErrorType(Constants.ErrorType.INDEXING_ERROR)
.withThrowable(e);
- handleError(tuples, error);
+ handleError(tuple, error);
}
/**
* Errors a set of tuples.
*
- * @param tuples The tuples to error.
+ * @param tuple The tuple to error.
* @param error
*/
- private void handleError(Iterable<Tuple> tuples, MetronError error) {
- tuples.forEach(t -> collector.ack(t));
+ private void handleError(Tuple tuple, MetronError error) {
+ collector.ack(tuple);
ErrorUtils.handleError(collector, error);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 4bb3888..590ab8c 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -311,7 +311,7 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
LOG.debug("Unable to extract message from tuple; expected valid JSON");
getWriterComponent().error(
new Exception("Unable to extract message from tuple; expected valid JSON"),
- ImmutableList.of(tuple)
+ tuple
);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index c389854..decf3a5 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -18,10 +18,12 @@
package org.apache.metron.writer;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@@ -36,8 +38,10 @@ import org.apache.metron.common.message.MessageGetStrategy;
import org.apache.metron.common.utils.ErrorUtils;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
import org.json.simple.JSONObject;
import org.junit.Before;
import org.junit.Rule;
@@ -127,9 +131,12 @@ public class BulkWriterComponentTest {
@Test
public void writeShouldProperlyHandleWriterErrors() throws Exception {
Throwable e = new Exception("test exception");
- MetronError error = new MetronError()
+ MetronError expectedError1 = new MetronError()
.withSensorType(Collections.singleton(sensorType))
- .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+ MetronError expectedError2 = new MetronError()
+ .withSensorType(Collections.singleton(sensorType))
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(e, tupleList);
@@ -139,8 +146,14 @@ public class BulkWriterComponentTest {
bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
- verifyStatic(times(1));
- ErrorUtils.handleError(collector, error);
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+ verify(collector, times(1)).ack(tuple1);
+ verify(collector, times(1)).ack(tuple2);
+ verify(collector, times(1)).reportError(e);
+ verifyNoMoreInteractions(collector);
}
@Test
@@ -161,9 +174,16 @@ public class BulkWriterComponentTest {
@Test
public void writeShouldProperlyHandleWriterException() throws Exception {
Throwable e = new Exception("test exception");
- MetronError error = new MetronError()
+ MetronError expectedError1 = new MetronError()
+ .withSensorType(Collections.singleton(sensorType))
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+ .withThrowable(e)
+ .withRawMessages(Collections.singletonList(message1));
+ MetronError expectedError2 = new MetronError()
.withSensorType(Collections.singleton(sensorType))
- .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+ .withThrowable(e)
+ .withRawMessages(Collections.singletonList(message2));
BulkWriterResponse response = new BulkWriterResponse();
response.addAllErrors(e, tupleList);
@@ -173,8 +193,14 @@ public class BulkWriterComponentTest {
bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
- verifyStatic(times(1));
- ErrorUtils.handleError(collector, error);
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+ verify(collector, times(1)).ack(tuple1);
+ verify(collector, times(1)).ack(tuple2);
+ verify(collector, times(1)).reportError(e);
+ verifyNoMoreInteractions(collector);
}
@Test
@@ -182,19 +208,28 @@ public class BulkWriterComponentTest {
Throwable e = new Exception("test exception");
MetronError error1 = new MetronError()
.withSensorType(Collections.singleton("sensor1"))
- .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+ .withThrowable(e)
+ .withRawMessages(Collections.singletonList(message1));
MetronError error2 = new MetronError()
.withSensorType(Collections.singleton("sensor2"))
- .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
+ .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+ .withThrowable(e)
+ .withRawMessages(Collections.singletonList(message2));
BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
bulkWriterComponent.errorAll(e, messageGetStrategy);
- verifyStatic(times(1));
- ErrorUtils.handleError(collector, error1);
- ErrorUtils.handleError(collector, error2);
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ new Values(argThat(new MetronErrorJSONMatcher(error1.getJSONObject()))));
+ verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+ new Values(argThat(new MetronErrorJSONMatcher(error2.getJSONObject()))));
+ verify(collector, times(1)).ack(tuple1);
+ verify(collector, times(1)).ack(tuple2);
+ verify(collector, times(2)).reportError(e);
+ verifyNoMoreInteractions(collector);
bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));