You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/04/13 17:17:16 UTC
metron git commit: METRON-1347: Indexing Topology should fail tuples
without a source.type (cstella via mmiklavc) closes apache/metron#863
Repository: metron
Updated Branches:
refs/heads/master 53124d97d -> bfe90ef1e
METRON-1347: Indexing Topology should fail tuples without a source.type (cstella via mmiklavc) closes apache/metron#863
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/bfe90ef1
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/bfe90ef1
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/bfe90ef1
Branch: refs/heads/master
Commit: bfe90ef1e579be53a14d9fd0e4dc19fc6a81baf0
Parents: 53124d9
Author: cstella <ce...@gmail.com>
Authored: Fri Apr 13 11:17:00 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Fri Apr 13 11:17:00 2018 -0600
----------------------------------------------------------------------
.../bolt/BulkMessageWriterBoltTest.java | 25 ++++++++++
metron-platform/metron-indexing/README.md | 6 +++
.../writer/bolt/BulkMessageWriterBolt.java | 51 ++++++++++++++------
3 files changed, 68 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
index 308638e..dedf5e6 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
@@ -118,6 +118,31 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
private MessageGetStrategy messageGetStrategy;
@Test
+ public void testSensorTypeMissing() throws Exception {
+ BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl")
+ .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+ .withMessageGetterField("message");
+ bulkMessageWriterBolt.setCuratorFramework(client);
+ bulkMessageWriterBolt.setZKCache(cache);
+ bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
+ new FileInputStream(sampleSensorIndexingConfigPath));
+
+ bulkMessageWriterBolt.declareOutputFields(declarer);
+ verify(declarer, times(1)).declareStream(eq("error"), argThat(
+ new FieldsMatcher("message")));
+ Map stormConf = new HashMap();
+ bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+ BulkWriterComponent<JSONObject> component = mock(BulkWriterComponent.class);
+ bulkMessageWriterBolt.setWriterComponent(component);
+ verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
+ JSONObject message = (JSONObject) new JSONParser().parse(sampleMessageString);
+ message.remove("source.type");
+ when(tuple.getValueByField("message")).thenReturn(message);
+ bulkMessageWriterBolt.execute(tuple);
+ verify(component, times(1)).error(eq("null"), any(), any(), any());
+ }
+
+ @Test
public void testFlushOnBatchSize() throws Exception {
BulkMessageWriterBolt bulkMessageWriterBolt = new BulkMessageWriterBolt("zookeeperUrl")
.withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-indexing/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md
index d351d7c..f4a4501 100644
--- a/metron-platform/metron-indexing/README.md
+++ b/metron-platform/metron-indexing/README.md
@@ -32,6 +32,12 @@ Indices are written in batch and the batch size and batch timeout are specified
[Sensor Indexing Configuration](#sensor-indexing-configuration) via the `batchSize` and `batchTimeout` parameters.
These configs are variable by sensor type.
+## Minimal Assumptions for Message Structure
+
+At minimum, a message should have a `source.type` field.
+Without this field, the message tuple will be failed and not written
+with an appropriate error indicated in the Storm UI and logs.
+
## Indexing Architecture
![Architecture](indexing_arch.png)
http://git-wip-us.apache.org/repos/asf/metron/blob/bfe90ef1/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 8202604..b5b97d8 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.writer.bolt;
+import com.google.common.collect.ImmutableList;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredIndexingBolt;
import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
@@ -125,6 +126,13 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
return defaultBatchTimeout;
}
+ public BulkWriterComponent<JSONObject> getWriterComponent() {
+ return writerComponent;
+ }
+
+ public void setWriterComponent(BulkWriterComponent<JSONObject> component) {
+ writerComponent = component;
+ }
/**
* This method is called by TopologyBuilder.createTopology() to obtain topology and
* bolt specific configuration parameters. We use it primarily to configure how often
@@ -160,9 +168,11 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
return conf;
}
+
+
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.writerComponent = new BulkWriterComponent<>(collector);
+ setWriterComponent(new BulkWriterComponent<>(collector));
this.collector = collector;
super.prepare(stormConf, context, collector);
if (messageGetField != null) {
@@ -185,7 +195,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
}
- writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
+ getWriterComponent().setDefaultBatchTimeout(defaultBatchTimeout);
bulkMessageWriter.init(stormConf, context, writerconf);
} catch (Exception e) {
throw new RuntimeException(e);
@@ -197,7 +207,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, Clock clock) {
prepare(stormConf, context, collector);
- writerComponent.withClock(clock);
+ getWriterComponent().withClock(clock);
}
@SuppressWarnings("unchecked")
@@ -208,7 +218,7 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
//WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
LOG.debug("Flushing message queues older than their batchTimeouts");
- writerComponent.flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
+ getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()))
, messageGetStrategy);
}
@@ -229,17 +239,30 @@ public class BulkMessageWriterBolt extends ConfiguredIndexingBolt {
LOG.trace("Writing enrichment message: {}", message);
WriterConfiguration writerConfiguration = configurationTransformation.apply(
new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations()));
- if(writerConfiguration.isDefault(sensorType)) {
- //want to warn, but not fail the tuple
- collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType));
+ if(sensorType == null) {
+ //sensor type somehow ended up being null. We want to error this message directly.
+ getWriterComponent().error("null"
+ , new Exception("Sensor type is not specified for message "
+ + message.toJSONString()
+ )
+ , ImmutableList.of(tuple)
+ , messageGetStrategy
+ );
+ }
+ else {
+ if (writerConfiguration.isDefault(sensorType)) {
+ //want to warn, but not fail the tuple
+ collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType));
+ }
+
+ getWriterComponent().write(sensorType
+ , tuple
+ , message
+ , bulkMessageWriter
+ , writerConfiguration
+ , messageGetStrategy
+ );
}
- writerComponent.write(sensorType
- , tuple
- , message
- , bulkMessageWriter
- , writerConfiguration
- , messageGetStrategy
- );
}
catch(Exception e) {
throw new RuntimeException("This should have been caught in the writerComponent. If you see this, file a JIRA", e);