You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/10/24 15:04:07 UTC

[18/51] [abbrv] metron git commit: METRON-1782 Add Kafka Partition and Offset to Profiler Debug Logs (nickwallen) closes apache/metron#1202

METRON-1782 Add Kafka Partition and Offset to Profiler Debug Logs (nickwallen) closes apache/metron#1202


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9c9e2954
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9c9e2954
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9c9e2954

Branch: refs/heads/feature/METRON-1090-stellar-assignment
Commit: 9c9e29547837e648723920329c3b7fea6211f0db
Parents: 1723a0e
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Oct 1 09:07:28 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon Oct 1 09:07:28 2018 -0400

----------------------------------------------------------------------
 .../src/main/flux/profiler/remote.yaml                 | 11 +++++++++--
 .../metron/profiler/storm/ProfileSplitterBolt.java     | 13 ++++++++++++-
 .../metron/profiler/storm/ProfileSplitterBoltTest.java |  3 ++-
 .../storm/kafka/flux/SimpleStormKafkaBuilder.java      |  4 +++-
 4 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
index da71b27..e16a782 100644
--- a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
@@ -77,8 +77,15 @@ components:
         className: "java.util.ArrayList"
         configMethods:
             -   name: "add"
-                args:
-                    - "value"
+                args: ["value"]
+            -   name: "add"
+                args: ["topic"]
+            -   name: "add"
+                args: ["partition"]
+            -   name: "add"
+                args: ["offset"]
+            -   name: "add"
+                args: ["timestamp"]
 
     -   id: "kafkaConfig"
         className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"

http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
index 81179b6..ef58ad9 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/ProfileSplitterBolt.java
@@ -43,6 +43,12 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.OFFSET;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.PARTITION;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.TIMESTAMP;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.TOPIC;
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE;
+
 /**
  * The Storm bolt responsible for filtering incoming messages and directing
  * each to the downstream bolts responsible for building a Profile.
@@ -132,6 +138,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   @Override
   public void execute(Tuple input) {
     try {
+      LOG.debug("Received message; topic={}, partition={}, offset={}, kafkaTimestamp={}",
+              input.contains(TOPIC.getFieldName())      ? input.getStringByField(TOPIC.getFieldName()):       "unknown",
+              input.contains(PARTITION.getFieldName())  ? input.getIntegerByField(PARTITION.getFieldName()):  "unknown",
+              input.contains(OFFSET.getFieldName())     ? input.getLongByField(OFFSET.getFieldName()):        "unknown",
+              input.contains(TIMESTAMP.getFieldName())  ? input.getLongByField(TIMESTAMP.getFieldName()):     "unknown");
       doExecute(input);
 
     } catch (Throwable t) {
@@ -146,7 +157,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   private void doExecute(Tuple input) throws ParseException, UnsupportedEncodingException {
 
     // retrieve the input message
-    byte[] data = input.getBinary(0);
+    byte[] data = input.getBinaryByField(VALUE.getFieldName());
     if(data == null) {
       LOG.debug("Received null message. Nothing to do.");
       return;

http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
index 93d2ac4..360ef4b 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java
@@ -39,6 +39,7 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.HashMap;
 
+import static org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder.FieldsConfiguration.VALUE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.times;
@@ -219,7 +220,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
     message = (JSONObject) parser.parse(input);
 
     // ensure the tuple returns the expected json message
-    when(tuple.getBinary(0)).thenReturn(input.getBytes());
+    when(tuple.getBinaryByField(VALUE.getFieldName())).thenReturn(input.getBytes());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/9c9e2954/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
index f99e549..0aebff8 100644
--- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
@@ -57,7 +57,9 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V
     KEY("key", record -> record.key()),
     VALUE("value", record -> record.value()),
     PARTITION("partition", record -> record.partition()),
-    TOPIC("topic", record -> record.topic())
+    OFFSET("offset", record -> record.offset()),
+    TOPIC("topic", record -> record.topic()),
+    TIMESTAMP("timestamp", record -> record.timestamp())
     ;
     String fieldName;
     Function<ConsumerRecord,Object> recordExtractor;