You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/02/05 06:48:30 UTC

[2/3] storm git commit: STORM-1455: Fix build failure

STORM-1455: Fix build failure


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5eac6855
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5eac6855
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5eac6855

Branch: refs/heads/1.x-branch
Commit: 5eac6855642df52823af9ba13ad12ab5505f9753
Parents: 0f5abf4
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Jan 26 23:29:50 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 14:40:15 2016 +0900

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |  8 +++----
 .../apache/storm/kafka/PartitionManager.java    |  2 +-
 .../kafka/trident/TridentKafkaEmitter.java      | 23 +++++++++++---------
 3 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5eac6855/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 7a83ae0..d1da446 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -17,17 +17,17 @@
  */
 package org.apache.storm.kafka;
 
+import com.google.common.base.Strings;
+
 import org.apache.storm.Config;
+import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
-import com.google.common.base.Strings;
-import kafka.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
 
 import java.util.*;
 
@@ -100,7 +100,7 @@ public class KafkaSpout extends BaseRichSpout {
                 }
                 _kafkaOffsetMetric.refreshPartitions(latestPartitions);
                 for (PartitionManager pm : pms) {
-                    _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
+                    _kafkaOffsetMetric.setOffsetData(pm.getPartition(), pm.getOffsetData());
                 }
                 return _kafkaOffsetMetric.getValueAndReset();
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/5eac6855/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index bcc4001..dbf70a0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -320,7 +320,7 @@ public class PartitionManager {
         }
     }
 
-    static class OffsetData {
+    public static class OffsetData {
         public long latestEmittedOffset;
         public long latestCompletedOffset;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/5eac6855/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 6eddaf5..9732c8c 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -17,27 +17,28 @@
  */
 package org.apache.storm.kafka.trident;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.storm.Config;
+import org.apache.storm.kafka.*;
 import org.apache.storm.metric.api.CombinedMetric;
 import org.apache.storm.metric.api.MeanReducer;
 import org.apache.storm.metric.api.ReducedMetric;
 import org.apache.storm.task.TopologyContext;
-import com.google.common.collect.ImmutableMap;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.*;
-import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.spout.IPartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
 public class TridentKafkaEmitter {
 
     public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
@@ -65,7 +66,9 @@ public class TridentKafkaEmitter {
     private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
         SimpleConsumer consumer = _connections.register(partition);
         Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
-        _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
+        Long offset = (Long) ret.get("offset");
+        Long endOffset = (Long) ret.get("nextOffset");
+        _kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
         return ret;
     }