You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/29 18:56:19 UTC
metron git commit: METRON-1751 Storm Profiler dies when consuming
null message (nickwallen) closes apache/metron#1176
Repository: metron
Updated Branches:
refs/heads/master 661e23e27 -> d32bd50d4
METRON-1751 Storm Profiler dies when consuming null message (nickwallen) closes apache/metron#1176
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d32bd50d
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d32bd50d
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d32bd50d
Branch: refs/heads/master
Commit: d32bd50d43aae87af9ec12d2daea83b2f4eca342
Parents: 661e23e
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Aug 29 14:55:58 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Aug 29 14:55:58 2018 -0400
----------------------------------------------------------------------
.../profiler/bolt/ProfileSplitterBolt.java | 29 +++++++++++---------
.../profiler/bolt/ProfileSplitterBoltTest.java | 16 +++++++++++
2 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/d32bd50d/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index f28411f..87f1ba9 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -144,9 +144,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
try {
doExecute(input);
- } catch (IllegalArgumentException | ParseException | UnsupportedEncodingException e) {
- LOG.error("Unexpected error", e);
- collector.reportError(e);
+ } catch (Throwable t) {
+ LOG.error("Unexpected error", t);
+ collector.reportError(t);
} finally {
collector.ack(input);
@@ -157,22 +157,25 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
// retrieve the input message
byte[] data = input.getBinary(0);
+ if(data == null) {
+ LOG.debug("Received null message. Nothing to do.");
+ return;
+ }
JSONObject message = (JSONObject) parser.parse(new String(data, "UTF8"));
// ensure there is a valid profiler configuration
ProfilerConfig config = getProfilerConfig();
- if(config != null && config.getProfiles().size() > 0) {
-
- // what time is it?
- Clock clock = clockFactory.createClock(config);
- Optional<Long> timestamp = clock.currentTimeMillis(message);
+ if(config == null || getProfilerConfig().getProfiles().size() == 0) {
+ LOG.debug("No Profiler configuration found. Nothing to do.");
+ return;
+ }
- // route the message. if a message does not contain the timestamp field, it cannot be routed.
- timestamp.ifPresent(ts -> routeMessage(input, message, config, ts));
+ // what time is it?
+ Clock clock = clockFactory.createClock(config);
+ Optional<Long> timestamp = clock.currentTimeMillis(message);
- } else {
- LOG.debug("No Profiler configuration found. Nothing to do.");
- }
+ // route the message. if a message does not contain the timestamp field, it cannot be routed.
+ timestamp.ifPresent(ts -> routeMessage(input, message, config, ts));
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/d32bd50d/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index bf81923..72e2b72 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -404,6 +404,22 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
.emit(any(Values.class));
}
+ @Test
+ public void testWithNullMessage() throws Exception {
+
+ // ensure the tuple returns null to mimic a null message in kafka
+ when(tuple.getBinary(0)).thenReturn(null);
+
+ ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
+ ProfileSplitterBolt bolt = createBolt(config);
+ bolt.execute(tuple);
+
+ // a tuple should NOT be emitted for the downstream profile builder
+ verify(outputCollector, times(0))
+ .emit(any(Values.class));
+
+ }
+
/**
* Creates a ProfilerConfig based on a string containing JSON.
*