You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:07 UTC
[18/51] [abbrv] metron git commit: METRON-1782 Add Kafka Partition
and Offset to Profiler Debug Logs (nickwallen) closes apache/metron#1202
METRON-1782 Add Kafka Partition and Offset to Profiler Debug Logs (nickwallen) closes apache/metron#1202
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9c9e2954
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9c9e2954
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9c9e2954
Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 9c9e29547837e648723920329c3b7fea6211f0db
Parents: 1723a0e
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Oct 1 09:07:28 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon Oct 1 09:07:28 2018 -0400
----------------------------------------------------------------------
.../src/main/flux/profiler/remote.yaml | 11 +++++++++--
.../metron/profiler/storm/ProfileSplitterBolt.java | 13 ++++++++++++-
.../metron/profiler/storm/ProfileSplitterBoltTest.java | 3 ++-
.../storm/kafka/flux/SimpleStormKafkaBuilder.java | 4 +++-
4 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
index da71b27..e16a782 100644
--- a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
@@ -77,8 +77,15 @@ components:
className: "java.util.ArrayList"
configMethods:
- name: "add"
- args:
- - "value"
+ args: ["value"]
+ - name: "add"
+ args: ["topic"]
+ - name: "add"
+ args: ["partition"]
+ - name: "add"
+ args: ["offset"]
+ - name: "add"
+ args: ["timestamp"]
- id: "kafkaConfig"
className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
index 81179b6..ef58ad9 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
@@ -43,6 +43,12 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.OFFSET;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.PARTITION;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.TIMESTAMP;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.TOPIC;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE;
+
/**
* The Storm bolt responsible for filtering incoming messages and directing
* each to the downstream bolts responsible for building a Profile.
@@ -132,6 +138,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
@Override
public void execute(Tuple input) {
try {
+ LOG.debug("Received message; topic={}, partition={}, offset={}, kafkaTimestamp={}",
+ input.contains(TOPIC.getFieldName()) ? input.getStringByField(TOPIC.getFieldName()): "unknown",
+ input.contains(PARTITION.getFieldName()) ? input.getIntegerByField(PARTITION.getFieldName()): "unknown",
+ input.contains(OFFSET.getFieldName()) ? input.getLongByField(OFFSET.getFieldName()): "unknown",
+ input.contains(TIMESTAMP.getFieldName()) ? input.getLongByField(TIMESTAMP.getFieldName()): "unknown");
doExecute(input);
} catch (Throwable t) {
@@ -146,7 +157,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingException {
// retrieve the input message
- byte[] data = input.getBinary(0);
+ byte[] data = input.getBinaryByField(VALUE.getFieldName());
if(data == null) {
LOG.debug("Received null message. Nothing to do.");
return;
http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
index 93d2ac4..360ef4b 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
@@ -39,6 +39,7 @@ import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
@@ -219,7 +220,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
message = (JSONObject) parser.parse(input);
// ensure the tuple returns the expected json message
- when(tuple.getBinary(0)).thenReturn(input.getBytes());
+ when(tuple.getBinaryByField(VALUE.getFieldName())).thenReturn(input.getBytes());
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
index f99e549..0aebff8 100644
--- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
@@ -57,7 +57,9 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V
KEY("key", record -> record.key()),
VALUE("value", record -> record.value()),
PARTITION("partition", record -> record.partition()),
- TOPIC("topic", record -> record.topic())
+ OFFSET("offset", record -> record.offset()),
+ TOPIC("topic", record -> record.topic()),
+ TIMESTAMP("timestamp", record -> record.timestamp())
;
String fieldName;
Function<ConsumerRecord,Object> recordExtractor;