You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:52 UTC

[33/44] metron git commit: METRON-966: Pcap topology does not commit offsets closes apache/incubator-metron#597

METRON-966: Pcap topology does not commit offsets closes apache/incubator-metron#597


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/356881ad
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/356881ad
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/356881ad

Branch: refs/heads/Metron_0.4.0
Commit: 356881ad22edc7c08d8a8f3812192333210f3c8e
Parents: 64473d4
Author: cstella <ce...@gmail.com>
Authored: Sat May 20 09:59:02 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sat May 20 09:59:02 2017 -0400

----------------------------------------------------------------------
 .../metron/spout/pcap/KafkaToHDFSSpout.java     | 50 ++++++++++++++++++++
 pom.xml                                         | 18 +------
 2 files changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/356881ad/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
index ddfd14a..5b3d6f6 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
@@ -22,9 +22,20 @@ import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.storm.kafka.Callback;
 import org.apache.storm.kafka.CallbackKafkaSpout;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> {
   static final long serialVersionUID = 0xDEADBEEFL;
   HDFSWriterConfig config = null;
+
+  private static ThreadLocal<List<Object>> messagesToBeAcked = new ThreadLocal<List<Object>>() {
+    @Override
+    protected List<Object> initialValue() {
+      return new ArrayList<>();
+    }
+  };
+
   public KafkaToHDFSSpout( SimpleStormKafkaBuilder<byte[], byte[]> spoutConfig
                          , HDFSWriterConfig config
                          )
@@ -40,5 +51,44 @@ public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> {
     return new HDFSWriterCallback().withConfig(config);
   }
 
+  /**
+   * Clear all the messages that are queued to be acked.
+   */
+  private void clearMessagesToBeAcked() {
+      for (Object messageId : messagesToBeAcked.get()) {
+        super.ack(messageId);
+      }
+      messagesToBeAcked.get().clear();
+  }
 
+  @Override
+  public void nextTuple() {
+    /*
+    This bears some explanation; nextTuple for a spout-only topology sans ackers, will ack as part of the emit method.
+    The unfortunate part about this is that this will prevent the internal bookeeping of the KafkaSpout to keep add the
+     message ID to the offsets to commit.  This is because it thinks it is not emitted by the time it gets to ack (because
+     ack is called *within* emit).  The result is that no offsets are acked.
+
+     What we have here is a correction.  The ack method will add the message ID to a queue to be acked and then at the end
+     of nextTuple, we will clear the cache and ack.  The net result is that the contract is adhered to for spout-only topologies,
+     ack happens in nextTuple().
+     */
+    super.nextTuple();
+    clearMessagesToBeAcked();
+  }
+
+  @Override
+  public void ack(Object messageId) {
+    messagesToBeAcked.get().add(messageId);
+  }
+
+  @Override
+  public void close() {
+    try {
+      clearMessagesToBeAcked();
+    }
+    finally {
+      super.close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/356881ad/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9f62249..4bbd8ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,22 +39,6 @@
 
     <repositories>
         <repository>
-            <releases>
-                <enabled>true</enabled>
-                <updatePolicy>always</updatePolicy>
-                <checksumPolicy>warn</checksumPolicy>
-            </releases>
-            <snapshots>
-                <enabled>true</enabled>
-                <updatePolicy>never</updatePolicy>
-                <checksumPolicy>warn</checksumPolicy>
-            </snapshots>
-            <id>HDPPrivateReleases</id>
-            <name>HDP Private Releases</name>
-            <url>http://nexus-private.hortonworks.com/nexus/content/groups/public</url>
-            <layout>default</layout>
-        </repository>
-        <repository>
             <id>clojars.org</id>
             <url>http://clojars.org/repo</url>
         </repository>
@@ -131,7 +115,7 @@
             <properties>
                 <hdp_version>2.5.0.0</hdp_version>
                 <build_number>1245</build_number>
-                <global_storm_kafka_version>1.1.0.2.6.1.0-SNAPSHOT</global_storm_kafka_version>
+                <global_storm_kafka_version>1.1.0</global_storm_kafka_version>
                 <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
                 <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
             </properties>