You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/08 17:40:01 UTC

storm git commit: STORM-1379: Removed Redundant Structure

Repository: storm
Updated Branches:
  refs/heads/master ce31f4cfb -> 64ff34966


STORM-1379: Removed Redundant Structure

(from patch in JIRA https://issues.apache.org/jira/secure/attachment/12776288/STORM-1379.patch)


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/64ff3496
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/64ff3496
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/64ff3496

Branch: refs/heads/master
Commit: 64ff34966927d4d7c53af3bc4000375655413d81
Parents: ce31f4c
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jan 8 10:39:03 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jan 8 10:39:03 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md                                       |  1 +
 .../src/jvm/storm/kafka/KafkaSpout.java            | 10 ----------
 .../src/jvm/storm/kafka/PartitionManager.java      | 17 ++++++++---------
 3 files changed, 9 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/64ff3496/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1dc812d..6383df4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1379: Removed Redundant Structure
  * STORM-706: Clarify examples README for IntelliJ.
  * STORM-1396: Added backward compatibility method for File Download
  * STORM-695: storm CLI tool reports zero exit code on error scenario

http://git-wip-us.apache.org/repos/asf/storm/blob/64ff3496/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index 56f170b..8169014 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@ -34,16 +34,6 @@ import java.util.*;
 // TODO: need to add blacklisting
 // TODO: need to make a best effort to not re-emit messages if don't have to
 public class KafkaSpout extends BaseRichSpout {
-    public static class MessageAndRealOffset {
-        public Message msg;
-        public long offset;
-
-        public MessageAndRealOffset(Message msg, long offset) {
-            this.msg = msg;
-            this.offset = offset;
-        }
-    }
-
     static enum EmitState {
         EMITTED_MORE_LEFT,
         EMITTED_END,

http://git-wip-us.apache.org/repos/asf/storm/blob/64ff3496/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 38a09e2..8411989 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import storm.kafka.KafkaSpout.EmitState;
-import storm.kafka.KafkaSpout.MessageAndRealOffset;
 import storm.kafka.trident.MaxMetric;
 
 import java.util.*;
@@ -54,7 +53,7 @@ public class PartitionManager {
 
     // retryRecords key = Kafka offset, value = retry info for the given message
     Long _committedTo;
-    LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
+    LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>();
     Partition _partition;
     SpoutConfig _spoutConfig;
     String _topologyInstanceId;
@@ -137,31 +136,31 @@ public class PartitionManager {
             fill();
         }
         while (true) {
-            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
+            MessageAndOffset toEmit = _waitingToEmit.pollFirst();
             if (toEmit == null) {
                 return EmitState.NO_EMITTED;
             }
 
             Iterable<List<Object>> tups;
             if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
-                tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
+                tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition, toEmit.offset());
             } else {
-                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition.topic);
+                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
             }
             
             if ((tups != null) && tups.iterator().hasNext()) {
                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
                     for (List<Object> tup : tups) {
-                        collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset));
+                        collector.emit(_spoutConfig.topic, tup, new KafkaMessageId(_partition, toEmit.offset()));
                     }
                 } else {
                     for (List<Object> tup : tups) {
-                        collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+                        collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset()));
                     }
                 }
                 break;
             } else {
-                ack(toEmit.offset);
+                ack(toEmit.offset());
             }
         }
         if (!_waitingToEmit.isEmpty()) {
@@ -223,7 +222,7 @@ public class PartitionManager {
                     if (!_pending.containsKey(cur_offset)) {
                         _pending.put(cur_offset, System.currentTimeMillis());
                     }
-                    _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
+                    _waitingToEmit.add(msg);
                     _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                     if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
                         this._failedMsgRetryManager.retryStarted(cur_offset);