You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/08 17:40:01 UTC
storm git commit: STORM-1379: Removed Redundant Structure
Repository: storm
Updated Branches:
refs/heads/master ce31f4cfb -> 64ff34966
STORM-1379: Removed Redundant Structure
(from patch in JIRA https://issues.apache.org/jira/secure/attachment/12776288/STORM-1379.patch)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/64ff3496
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/64ff3496
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/64ff3496
Branch: refs/heads/master
Commit: 64ff34966927d4d7c53af3bc4000375655413d81
Parents: ce31f4c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 8 10:39:03 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 8 10:39:03 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
.../src/jvm/storm/kafka/KafkaSpout.java | 10 ----------
.../src/jvm/storm/kafka/PartitionManager.java | 17 ++++++++---------
3 files changed, 9 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/64ff3496/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1dc812d..6383df4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1379: Removed Redundant Structure
* STORM-706: Clarify examples README for IntelliJ.
* STORM-1396: Added backward compatibility method for File Download
* STORM-695: storm CLI tool reports zero exit code on error scenario
http://git-wip-us.apache.org/repos/asf/storm/blob/64ff3496/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index 56f170b..8169014 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -34,16 +34,6 @@ import java.util.*;
// TODO: need to add blacklisting
// TODO: need to make a best effort to not re-emit messages if don't have to
public class KafkaSpout extends BaseRichSpout {
- public static class MessageAndRealOffset {
- public Message msg;
- public long offset;
-
- public MessageAndRealOffset(Message msg, long offset) {
- this.msg = msg;
- this.offset = offset;
- }
- }
-
static enum EmitState {
EMITTED_MORE_LEFT,
EMITTED_END,
http://git-wip-us.apache.org/repos/asf/storm/blob/64ff3496/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 38a09e2..8411989 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout.EmitState;
-import storm.kafka.KafkaSpout.MessageAndRealOffset;
import storm.kafka.trident.MaxMetric;
import java.util.*;
@@ -54,7 +53,7 @@ public class PartitionManager {
// retryRecords key = Kafka offset, value = retry info for the given message
Long _committedTo;
- LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
+ LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>();
Partition _partition;
SpoutConfig _spoutConfig;
String _topologyInstanceId;
@@ -137,31 +136,31 @@ public class PartitionManager {
fill();
}
while (true) {
- MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
+ MessageAndOffset toEmit = _waitingToEmit.pollFirst();
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
Iterable<List<Object>> tups;
if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
- tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
+ tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset());
} else {
- tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition.topic);
+ tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
}
if ((tups != null) && tups.iterator().hasNext()) {
if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
for (List<Object> tup : tups) {
- collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset));
+ collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset()));
}
} else {
for (List<Object> tup : tups) {
- collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+ collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset()));
}
}
break;
} else {
- ack(toEmit.offset);
+ ack(toEmit.offset());
}
}
if (!_waitingToEmit.isEmpty()) {
@@ -223,7 +222,7 @@ public class PartitionManager {
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
}
- _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
+ _waitingToEmit.add(msg);
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
this._failedMsgRetryManager.retryStarted(cur_offset);