You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:36 UTC

[47/51] [abbrv] metron git commit: METRON-1829 Large Error Message Causes Slow Search Performance (merrimanr) closes apache/metron#1239

METRON-1829 Large Error Message Causes Slow Search Performance (merrimanr) closes apache/metron#1239


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d44a3925
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d44a3925
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d44a3925

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: d44a392567e35022bfb35a787b55aff3447ac60e
Parents: 2531c3e
Author: merrimanr <me...@gmail.com>
Authored: Mon Oct 22 08:43:10 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Mon Oct 22 08:43:10 2018 -0500

----------------------------------------------------------------------
 .../bolt/BulkMessageWriterBoltTest.java         |  2 +-
 metron-platform/metron-writer/pom.xml           |  6 ++
 .../metron/writer/BulkWriterComponent.java      | 33 ++++++-----
 .../writer/bolt/BulkMessageWriterBolt.java      |  2 +-
 .../metron/writer/BulkWriterComponentTest.java  | 61 +++++++++++++++-----
 5 files changed, 76 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 588fc58..083628c 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -207,7 +207,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
     }
     UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
     verify(outputCollector, times(5)).ack(tuple);
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+    verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
     verify(outputCollector, times(1)).reportError(any(Throwable.class));
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/pom.xml b/metron-platform/metron-writer/pom.xml
index e845516..6d08093 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -207,6 +207,12 @@
             <artifactId>metron-common</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-test-utilities</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 7678584..68585c5 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -40,6 +40,7 @@ import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,12 +119,18 @@ public class BulkWriterComponent<MESSAGE_T> {
 
   public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
     LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e);
-    MetronError error = new MetronError()
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e);
-    tuples.forEach(t -> error.addRawMessage(messageGetStrategy.get(t)));
-    handleError(tuples, error);
+    tuples.forEach(t -> {
+      MetronError error = new MetronError()
+              .withSensorType(Collections.singleton(sensorType))
+              .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+              .withThrowable(e)
+              .addRawMessage(messageGetStrategy.get(t));
+      collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+      collector.ack(t);
+    });
+    // there is only one error to report for all of the failed tuples
+    collector.reportError(e);
+
   }
 
   /**
@@ -133,24 +140,24 @@ public class BulkWriterComponent<MESSAGE_T> {
    * <p>Without a valid message, the JSON message cannot be added to the error.
    *
    * @param e The exception that occurred.
-   * @param tuples The tuples to error that may not contain valid messages.
+   * @param tuple The tuple to error that may not contain a valid message.
    */
-  public void error(Throwable e, Iterable<Tuple> tuples) {
-    LOG.error(format("Failing %d tuple(s)", Iterables.size(tuples)), e);
+  public void error(Throwable e, Tuple tuple) {
+    LOG.error("Failing tuple", e);
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.INDEXING_ERROR)
             .withThrowable(e);
-    handleError(tuples, error);
+    handleError(tuple, error);
   }
 
   /**
    * Errors a set of tuples.
    *
-   * @param tuples The tuples to error.
+   * @param tuple The tuple to error.
    * @param error
    */
-  private void handleError(Iterable<Tuple> tuples, MetronError error) {
-    tuples.forEach(t -> collector.ack(t));
+  private void handleError(Tuple tuple, MetronError error) {
+    collector.ack(tuple);
     ErrorUtils.handleError(collector, error);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 4bb3888..590ab8c 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -311,7 +311,7 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
     LOG.debug("Unable to extract message from tuple; expected valid JSON");
     getWriterComponent().error(
             new Exception("Unable to extract message from tuple; expected valid JSON"),
-            ImmutableList.of(tuple)
+            tuple
     );
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d44a3925/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index c389854..decf3a5 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -18,10 +18,12 @@
 package org.apache.metron.writer;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@@ -36,8 +38,10 @@ import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Rule;
@@ -127,9 +131,12 @@ public class BulkWriterComponentTest {
   @Test
   public void writeShouldProperlyHandleWriterErrors() throws Exception {
     Throwable e = new Exception("test exception");
-    MetronError error = new MetronError()
+    MetronError expectedError1 = new MetronError()
             .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+    MetronError expectedError2 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
 
@@ -139,8 +146,14 @@ public class BulkWriterComponentTest {
     bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
     bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
 
-    verifyStatic(times(1));
-    ErrorUtils.handleError(collector, error);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(1)).reportError(e);
+    verifyNoMoreInteractions(collector);
   }
 
   @Test
@@ -161,9 +174,16 @@ public class BulkWriterComponentTest {
   @Test
   public void writeShouldProperlyHandleWriterException() throws Exception {
     Throwable e = new Exception("test exception");
-    MetronError error = new MetronError()
+    MetronError expectedError1 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message1));
+    MetronError expectedError2 = new MetronError()
             .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Arrays.asList(message1, message2));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message2));
     BulkWriterResponse response = new BulkWriterResponse();
     response.addAllErrors(e, tupleList);
 
@@ -173,8 +193,14 @@ public class BulkWriterComponentTest {
     bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
     bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
 
-    verifyStatic(times(1));
-    ErrorUtils.handleError(collector, error);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(1)).reportError(e);
+    verifyNoMoreInteractions(collector);
   }
 
   @Test
@@ -182,19 +208,28 @@ public class BulkWriterComponentTest {
     Throwable e = new Exception("test exception");
     MetronError error1 = new MetronError()
             .withSensorType(Collections.singleton("sensor1"))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message1));
     MetronError error2 = new MetronError()
             .withSensorType(Collections.singleton("sensor2"))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .withRawMessages(Collections.singletonList(message2));
 
     BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
     bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
     bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
     bulkWriterComponent.errorAll(e, messageGetStrategy);
 
-    verifyStatic(times(1));
-    ErrorUtils.handleError(collector, error1);
-    ErrorUtils.handleError(collector, error2);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(error1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(error2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(2)).reportError(e);
+    verifyNoMoreInteractions(collector);
 
     bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
     verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));