You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:41:52 UTC
[33/44] metron git commit: METRON-966: Pcap topology does not commit
offsets closes apache/incubator-metron#597
METRON-966: Pcap topology does not commit offsets closes apache/incubator-metron#597
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/356881ad
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/356881ad
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/356881ad
Branch: refs/heads/Metron_0.4.0
Commit: 356881ad22edc7c08d8a8f3812192333210f3c8e
Parents: 64473d4
Author: cstella <ce...@gmail.com>
Authored: Sat May 20 09:59:02 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sat May 20 09:59:02 2017 -0400
----------------------------------------------------------------------
.../metron/spout/pcap/KafkaToHDFSSpout.java | 50 ++++++++++++++++++++
pom.xml | 18 +------
2 files changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/356881ad/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
index ddfd14a..5b3d6f6 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java
@@ -22,9 +22,20 @@ import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
import org.apache.storm.kafka.Callback;
import org.apache.storm.kafka.CallbackKafkaSpout;
+import java.util.ArrayList;
+import java.util.List;
+
public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> {
static final long serialVersionUID = 0xDEADBEEFL;
HDFSWriterConfig config = null;
+
+ private static ThreadLocal<List<Object>> messagesToBeAcked = new ThreadLocal<List<Object>>() {
+ @Override
+ protected List<Object> initialValue() {
+ return new ArrayList<>();
+ }
+ };
+
public KafkaToHDFSSpout( SimpleStormKafkaBuilder<byte[], byte[]> spoutConfig
, HDFSWriterConfig config
)
@@ -40,5 +51,44 @@ public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> {
return new HDFSWriterCallback().withConfig(config);
}
+ /**
+ * Clear all the messages that are queued to be acked.
+ */
+ private void clearMessagesToBeAcked() {
+ for (Object messageId : messagesToBeAcked.get()) {
+ super.ack(messageId);
+ }
+ messagesToBeAcked.get().clear();
+ }
+ @Override
+ public void nextTuple() {
+ /*
+ This bears some explanation; nextTuple for a spout-only topology sans ackers, will ack as part of the emit method.
+ The unfortunate part about this is that this will prevent the internal bookeeping of the KafkaSpout to keep add the
+ message ID to the offsets to commit. This is because it thinks it is not emitted by the time it gets to ack (because
+ ack is called *within* emit). The result is that no offsets are acked.
+
+ What we have here is a correction. The ack method will add the message ID to a queue to be acked and then at the end
+ of nextTuple, we will clear the cache and ack. The net result is that the contract is adhered to for spout-only topologies,
+ ack happens in nextTuple().
+ */
+ super.nextTuple();
+ clearMessagesToBeAcked();
+ }
+
+ @Override
+ public void ack(Object messageId) {
+ messagesToBeAcked.get().add(messageId);
+ }
+
+ @Override
+ public void close() {
+ try {
+ clearMessagesToBeAcked();
+ }
+ finally {
+ super.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/356881ad/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9f62249..4bbd8ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,22 +39,6 @@
<repositories>
<repository>
- <releases>
- <enabled>true</enabled>
- <updatePolicy>always</updatePolicy>
- <checksumPolicy>warn</checksumPolicy>
- </releases>
- <snapshots>
- <enabled>true</enabled>
- <updatePolicy>never</updatePolicy>
- <checksumPolicy>warn</checksumPolicy>
- </snapshots>
- <id>HDPPrivateReleases</id>
- <name>HDP Private Releases</name>
- <url>http://nexus-private.hortonworks.com/nexus/content/groups/public</url>
- <layout>default</layout>
- </repository>
- <repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
@@ -131,7 +115,7 @@
<properties>
<hdp_version>2.5.0.0</hdp_version>
<build_number>1245</build_number>
- <global_storm_kafka_version>1.1.0.2.6.1.0-SNAPSHOT</global_storm_kafka_version>
+ <global_storm_kafka_version>1.1.0</global_storm_kafka_version>
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
</properties>