You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/04/13 17:17:16 UTC

metron git commit: METRON-1347: Indexing Topology should fail tuples without a source.type (cstella via mmiklavc) closes apache/metron#863

Repository: metron
Updated Branches:
  refs/heads/master 53124d97d -> bfe90ef1e


METRON-1347: Indexing Topology should fail tuples without a source.type (cstella via mmiklavc) closes apache/metron#863


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/bfe90ef1
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/bfe90ef1
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/bfe90ef1

Branch: refs/heads/master
Commit: bfe90ef1e579be53a14d9fd0e4dc19fc6a81baf0
Parents: 53124d9
Author: cstella <ce...@gmail.com>
Authored: Fri Apr 13 11:17:00 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Fri Apr 13 11:17:00 2018 -0600

----------------------------------------------------------------------
 .../bolt/BulkMessageWriterBoltTest.java         | 25 ++++++++++
 metron-platform/metron-indexing/README.md       |  6 +++
 .../writer/bolt/BulkMessageWriterBolt.java      | 51 ++++++++++++++------
 3 files changed, 68 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 308638e..dedf5e6 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -118,6 +118,31 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
   private MessageGetStrategy messageGetStrategy;
 
   @Test
+  public void testSensorTypeMissing() throws Exception {
+    BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl")
+            .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+            .withMessageGetterField("message");
+    bulkMessageWriterBolt.setCuratorFramework(client);
+    bulkMessageWriterBolt.setZKCache(cache);
+    bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
+            new FileInputStream(sampleSensorIndexingConfigPath));
+
+    bulkMessageWriterBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("error"), argThat(
+            new FieldsMatcher("message")));
+    Map stormConf = new HashMap();
+    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+    BulkWriterComponent<JSONObject> component = mock(BulkWriterComponent.class);
+    bulkMessageWriterBolt.setWriterComponent(component);
+    verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
+    JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString);
+    message.remove("source.type");
+    when(tuple.getValueByField("message")).thenReturn(message);
+    bulkMessageWriterBolt.execute(tuple);
+    verify(component, times(1)).error(eq("null"), any(), any(), any());
+  }
+
+  @Test
   public void testFlushOnBatchSize() throws Exception {
     BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl")
             .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())

http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index d351d7c..f4a4501 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -32,6 +32,12 @@ Indices are written in batch and the batch size and batch timeout are specified
 [Sensor Indexing Configuration](#sensor-indexing-configuration) via the `batchSize` and `batchTimeout` parameters.
 These configs are variable by sensor type.
 
+## Minimal Assumptions for Message Structure
+
+At minimum, a message should have a `source.type` field.
+Without this field, the message tuple will be failed and not written
+with an appropriate error indicated in the Storm UI and logs.
+
 ## Indexing Architecture
 
 ![Architecture](indexing_arch.png)

http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 8202604..b5b97d8 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.writer.bolt;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredIndexingBolt;
 import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
@@ -125,6 +126,13 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     return defaultBatchTimeout;
   }
 
+  public BulkWriterComponent<JSONObject> getWriterComponent() {
+    return writerComponent;
+  }
+
+  public void setWriterComponent(BulkWriterComponent<JSONObject> component) {
+    writerComponent = component;
+  }
   /**
    * This method is called by TopologyBuilder.createTopology() to obtain topology and
    * bolt specific configuration parameters.  We use it primarily to configure how often
@@ -160,9 +168,11 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
     return conf;
   }
 
+
+
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    this.writerComponent = new BulkWriterComponent<>(collector);
+    setWriterComponent(new BulkWriterComponent<>(collector));
     this.collector = collector;
     super.prepare(stormConf, context, collector);
     if (messageGetField != null) {
@@ -185,7 +195,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
         BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
         defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
       }
-      writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
+      getWriterComponent().setDefaultBatchTimeout(defaultBatchTimeout);
       bulkMessageWriter.init(stormConf, context, writerconf);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -197,7 +207,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
    */
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, Clock clock) {
     prepare(stormConf, context, collector);
-    writerComponent.withClock(clock);
+    getWriterComponent().withClock(clock);
   }
 
   @SuppressWarnings("unchecked")
@@ -208,7 +218,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
         if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
           //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
           LOG.debug("Flushing message queues older than their batchTimeouts");
-          writerComponent.flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
+          getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
                   new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
                   , messageGetStrategy);
         }
@@ -229,17 +239,30 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
       LOG.trace("Writing enrichment message: {}", message);
       WriterConfiguration writerConfiguration = configurationTransformation.apply(
               new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
-      if(writerConfiguration.isDefault(sensorType)) {
-        //want to warn, but not fail the tuple
-        collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType));
+      if(sensorType == null) {
+        //sensor type somehow ended up being null.  We want to error this message directly.
+        getWriterComponent().error("null"
+                             , new Exception("Sensor type is not specified for message "
+                                            + message.toJSONString()
+                                            )
+                             , ImmutableList.of(tuple)
+                             , messageGetStrategy
+                             );
+      }
+      else {
+        if (writerConfiguration.isDefault(sensorType)) {
+          //want to warn, but not fail the tuple
+          collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType));
+        }
+
+        getWriterComponent().write(sensorType
+                , tuple
+                , message
+                , bulkMessageWriter
+                , writerConfiguration
+                , messageGetStrategy
+        );
       }
-      writerComponent.write(sensorType
-                           , tuple
-                           , message
-                           , bulkMessageWriter
-                           , writerConfiguration
-                           , messageGetStrategy
-                           );
     }
     catch(Exception e) {
       throw new RuntimeException("This should have been caught in the writerComponent.  If you see this, file a JIRA", e);