You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/02/05 06:48:30 UTC
[2/3] storm git commit: STORM-1455: Fix build failure
STORM-1455: Fix build failure
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5eac6855
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5eac6855
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5eac6855
Branch: refs/heads/1.x-branch
Commit: 5eac6855642df52823af9ba13ad12ab5505f9753
Parents: 0f5abf4
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Jan 26 23:29:50 2016 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 5 14:40:15 2016 +0900
----------------------------------------------------------------------
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 8 +++----
.../apache/storm/kafka/PartitionManager.java | 2 +-
.../kafka/trident/TridentKafkaEmitter.java | 23 +++++++++++---------
3 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5eac6855/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index 7a83ae0..d1da446 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -17,17 +17,17 @@
*/
package org.apache.storm.kafka;
+import com.google.common.base.Strings;
+
import org.apache.storm.Config;
+import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
-import com.google.common.base.Strings;
-import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
import java.util.*;
@@ -100,7 +100,7 @@ public class KafkaSpout extends BaseRichSpout {
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
- _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
+ _kafkaOffsetMetric.setOffsetData(pm.getPartition(), pm.getOffsetData());
}
return _kafkaOffsetMetric.getValueAndReset();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5eac6855/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index bcc4001..dbf70a0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -320,7 +320,7 @@ public class PartitionManager {
}
}
- static class OffsetData {
+ public static class OffsetData {
public long latestEmittedOffset;
public long latestCompletedOffset;
http://git-wip-us.apache.org/repos/asf/storm/blob/5eac6855/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 6eddaf5..9732c8c 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -17,27 +17,28 @@
*/
package org.apache.storm.kafka.trident;
+import com.google.common.collect.ImmutableMap;
+
import org.apache.storm.Config;
+import org.apache.storm.kafka.*;
import org.apache.storm.metric.api.CombinedMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.task.TopologyContext;
-import com.google.common.collect.ImmutableMap;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.storm.kafka.*;
-import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
public class TridentKafkaEmitter {
public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
@@ -65,7 +66,9 @@ public class TridentKafkaEmitter {
private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
SimpleConsumer consumer = _connections.register(partition);
Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
- _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
+ Long offset = (Long) ret.get("offset");
+ Long endOffset = (Long) ret.get("nextOffset");
+ _kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
return ret;
}