You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/03 02:52:13 UTC

[1/4] storm git commit: STORM-2014: Make KafkaSpout delegate maxRetry check to RetryService

Repository: storm
Updated Branches:
  refs/heads/master e8c390059 -> 38ee403bf


STORM-2014: Make KafkaSpout delegate maxRetry check to RetryService


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0aa0e857
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0aa0e857
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0aa0e857

Branch: refs/heads/master
Commit: 0aa0e8579723a76f53fe7e0ae71dfd1722465c0e
Parents: e3b2f96
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Aug 2 19:17:02 2016 +0200
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Thu Jan 12 21:27:05 2017 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 13 ++++-----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 23 +--------------
 .../KafkaSpoutRetryExponentialBackoff.java      | 30 ++++++++++++--------
 .../kafka/spout/KafkaSpoutRetryService.java     | 12 ++++++--
 4 files changed, 34 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d405c4d..60ca0b9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -72,7 +72,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     // Bookkeeping
-    private transient int maxRetries;                                   // Max number of times a tuple is retried
     private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
     private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
     private transient Timer commitTimer;                                // timer == null for auto commit mode
@@ -105,7 +104,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Spout internals
         this.collector = collector;
-        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
         numUncommittedOffsets = 0;
 
         // Offset management
@@ -381,11 +379,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
             return;
         }
-        if (msgId.numFails() < maxRetries) {
-            emitted.remove(msgId);
-            msgId.incrementNumFails();
-            retryService.schedule(msgId);
-        } else { // limit to max number of retries
+        emitted.remove(msgId);
+        msgId.incrementNumFails();
+        if (!retryService.schedule(msgId)) {
             LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
             ack(msgId);
         }
@@ -523,6 +519,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
 
         /**
+         * An offset is only committed when all records with lower offset have
+         * been acked. This guarantees that all offsets smaller than the
+         * committedOffset have been delivered.
          * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
          */
         public OffsetAndMetadata findNextCommitOffset() {

http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 8aa525b..2818c9a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -74,7 +74,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     // Kafka spout configuration
     private final long offsetCommitPeriodMs;
-    private final int maxRetries;
     private final int maxUncommittedOffsets;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final KafkaSpoutStreams kafkaSpoutStreams;
@@ -87,7 +86,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.valueDeserializer = builder.valueDeserializer;
         this.pollTimeoutMs = builder.pollTimeoutMs;
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.maxRetries = builder.maxRetries;
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
         this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
@@ -109,7 +107,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private SerializableDeserializer<V> valueDeserializer;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
-        private int maxRetries = DEFAULT_MAX_RETRIES;
         private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
         private final KafkaSpoutStreams kafkaSpoutStreams;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
@@ -194,20 +191,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.offsetCommitPeriodMs = offsetCommitPeriodMs;
             return this;
         }
-
-        /**
-         * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
-         * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
-         * all the previously polled records.
-         * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
-         * polled records in favor of processing more records.
-         * @param maxRetries max number of retrials
-         */
-        public Builder<K,V> setMaxRetries(int maxRetries) {
-            this.maxRetries = maxRetries;
-            return this;
-        }
-
+        
         /**
          * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
@@ -283,10 +267,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                 null;
     }
 
-    public int getMaxTupleRetries() {
-        return maxRetries;
-    }
-
     public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
         return firstPollOffsetStrategy;
     }
@@ -315,7 +295,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                 ", valueDeserializer=" + valueDeserializer +
                 ", pollTimeoutMs=" + pollTimeoutMs +
                 ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
-                ", maxRetries=" + maxRetries +
                 ", maxUncommittedOffsets=" + maxUncommittedOffsets +
                 ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
                 ", kafkaSpoutStreams=" + kafkaSpoutStreams +

http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index f59367d..2c8d7e4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -40,25 +40,26 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
     private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
 
-    private TimeInterval initialDelay;
-    private TimeInterval delayPeriod;
-    private TimeInterval maxDelay;
-    private int maxRetries;
+    private final TimeInterval initialDelay;
+    private final TimeInterval delayPeriod;
+    private final TimeInterval maxDelay;
+    private final int maxRetries;
 
-    private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
-    private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
+    private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
 
     /**
      * Comparator ordering by timestamp 
      */
     private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+        @Override
         public int compare(RetrySchedule entry1, RetrySchedule entry2) {
             return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
         }
     }
 
     private class RetrySchedule {
-        private KafkaSpoutMessageId msgId;
+        private final KafkaSpoutMessageId msgId;
         private long nextRetryTimeNanos;
 
         public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
@@ -94,9 +95,9 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     public static class TimeInterval implements Serializable {
-        private long lengthNanos;
-        private long length;
-        private TimeUnit timeUnit;
+        private final long lengthNanos;
+        private final long length;
+        private final TimeUnit timeUnit;
 
         /**
          * @param length length of the time interval in the units specified by {@link TimeUnit}
@@ -144,7 +145,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     /**
      * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
      * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
-     * nextRetry = Min(nextRetry, currentTime + maxDelay)
+     * nextRetry = Min(nextRetry, currentTime + maxDelay).
+     * 
+     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
+     * polled records in favor of processing more records.
      *
      * @param initialDelay      initial delay of the first retry
      * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -239,9 +243,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     @Override
-    public void schedule(KafkaSpoutMessageId msgId) {
+    public boolean schedule(KafkaSpoutMessageId msgId) {
         if (msgId.numFails() > maxRetries) {
             LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+            return false;
         } else {
             if (toRetryMsgs.contains(msgId)) {
                 for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
@@ -257,6 +262,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
             toRetryMsgs.add(msgId);
             LOG.debug("Scheduled. {}", retrySchedule);
             LOG.trace("Current state {}", retrySchedules);
+            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5aab167..bf17a5a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -29,14 +29,18 @@ import java.util.Set;
  */
 public interface KafkaSpoutRetryService extends Serializable {
     /**
-     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or
+     * updates retry time if it has already been scheduled. It may also indicate
+     * that the message should not be retried, in which case the message will not be scheduled.
      * @param msgId message to schedule for retrial
+     * @return true if the message will be retried, false otherwise
      */
-    void schedule(KafkaSpoutMessageId msgId);
+    boolean schedule(KafkaSpoutMessageId msgId);
 
     /**
      * Removes a message from the list of messages scheduled for retrial
      * @param msgId message to remove from retrial
+     * @return true if the message was scheduled for retrial, false otherwise
      */
     boolean remove(KafkaSpoutMessageId msgId);
 
@@ -56,8 +60,9 @@ public interface KafkaSpoutRetryService extends Serializable {
     Set<TopicPartition> retriableTopicPartitions();
 
     /**
-     * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
      * i.e is scheduled and has retry time that is less than current time.
+     * @param msgId message to check for readiness
      * @return true if message is ready to be retried, false otherwise
      */
     boolean isReady(KafkaSpoutMessageId msgId);
@@ -65,6 +70,7 @@ public interface KafkaSpoutRetryService extends Serializable {
     /**
      * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
      * The message may or may not be ready to be retried yet.
+     * @param msgId message to check for scheduling status
      * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
      * Returns false is this message is not scheduled for retrial
      */


[4/4] storm git commit: STORM-2014: CHANGELOG

Posted by ka...@apache.org.
STORM-2014: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/38ee403b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/38ee403b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/38ee403b

Branch: refs/heads/master
Commit: 38ee403bf4a537ef276221033901690f099479c0
Parents: 99be172
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 11:52:02 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:52:02 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/38ee403b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e5a6fba..3739114 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -196,6 +196,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.0
+ * STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
  * STORM-2324: Fix deployment failure if resources directory is missing in topology jar
  * STORM-1443: [Storm SQL] Support customizing parallelism in StormSQL
  * STORM-2148: [Storm SQL] Trident mode: back to code generate and compile Trident topology


[2/4] storm git commit: Merge branch 'master' of https://github.com/apache/storm into STORM-2014

Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/apache/storm into STORM-2014


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad2be678
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad2be678
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad2be678

Branch: refs/heads/master
Commit: ad2be678831b3b060229fd936e3908110162b7ac
Parents: 0aa0e85 251cb88
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Wed Feb 1 22:28:00 2017 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Feb 1 22:28:00 2017 +0100

----------------------------------------------------------------------
 CHANGELOG.md                                    |  25 +-
 conf/defaults.yaml                              |   2 +-
 docs/Multilang-protocol.md                      |  12 +-
 docs/storm-kafka-client.md                      | 293 ++++++++---
 docs/storm-sql-reference.md                     |   2 +
 .../TridentKafkaClientWordCountNamedTopics.java |  69 +--
 ...identKafkaClientWordCountWildcardTopics.java |  31 +-
 external/flux/README.md                         |  19 +
 .../java/org/apache/storm/flux/FluxBuilder.java |  17 +-
 .../storm/flux/model/BeanListReference.java     |  37 ++
 .../storm/flux/model/ConfigMethodDef.java       |   5 +
 .../org/apache/storm/flux/model/ObjectDef.java  |   7 +-
 .../java/org/apache/storm/flux/TCKTest.java     |   1 +
 .../org/apache/storm/flux/test/TestBolt.java    |  43 ++
 .../resources/configs/config-methods-test.yaml  |  22 +-
 .../jvm/org/apache/storm/sql/StormSqlImpl.java  |  53 +-
 .../apache/storm/sql/compiler/CompilerUtil.java |   4 +-
 .../sql/compiler/PostOrderRelNodeVisitor.java   | 132 -----
 .../RexNodeToBlockStatementCompiler.java        |   9 +-
 .../sql/compiler/StormSqlTypeFactoryImpl.java   |  51 ++
 .../standalone/PostOrderRelNodeVisitor.java     | 132 +++++
 .../backends/standalone/RelNodeCompiler.java    |   3 -
 .../compiler/backends/trident/PlanCompiler.java |  74 ---
 .../trident/TridentLogicalPlanCompiler.java     | 171 -------
 .../sql/planner/StormRelDataTypeSystem.java     |  37 ++
 .../apache/storm/sql/planner/StormRelUtils.java |  62 +++
 .../planner/UnsupportedOperatorsVisitor.java    |  24 +
 .../storm/sql/planner/rel/StormCalcRelBase.java |  32 ++
 .../sql/planner/rel/StormFilterRelBase.java     |  32 ++
 .../storm/sql/planner/rel/StormJoinRelBase.java |  36 ++
 .../sql/planner/rel/StormProjectRelBase.java    |  35 ++
 .../storm/sql/planner/rel/StormRelNode.java     |  24 +
 .../planner/rel/StormStreamInsertRelBase.java   |  36 ++
 .../sql/planner/rel/StormStreamScanRelBase.java |  34 ++
 .../storm/sql/planner/trident/QueryPlanner.java | 149 ++++++
 .../sql/planner/trident/TridentPlanCreator.java |  90 ++++
 .../planner/trident/TridentStormRuleSets.java   | 110 +++++
 .../sql/planner/trident/rel/TridentCalcRel.java |  96 ++++
 .../planner/trident/rel/TridentFilterRel.java   |  60 +++
 .../trident/rel/TridentLogicalConvention.java   |  67 +++
 .../planner/trident/rel/TridentProjectRel.java  |  70 +++
 .../sql/planner/trident/rel/TridentRel.java     |  26 +
 .../trident/rel/TridentStreamInsertRel.java     |  76 +++
 .../trident/rel/TridentStreamScanRel.java       |  51 ++
 .../trident/rules/TridentAggregateRule.java     |  39 ++
 .../planner/trident/rules/TridentCalcRule.java  |  45 ++
 .../trident/rules/TridentFilterRule.java        |  45 ++
 .../planner/trident/rules/TridentJoinRule.java  |  37 ++
 .../trident/rules/TridentModifyRule.java        |  71 +++
 .../trident/rules/TridentProjectRule.java       |  45 ++
 .../planner/trident/rules/TridentScanRule.java  |  50 ++
 .../test/org/apache/storm/sql/TestStormSql.java |  18 +
 .../storm/sql/compiler/TestCompilerUtils.java   | 260 ----------
 .../storm/sql/compiler/TestExprSemantic.java    |   4 +-
 .../backends/standalone/TestCompilerUtils.java  | 183 +++++++
 .../backends/standalone/TestPlanCompiler.java   |   1 -
 .../standalone/TestRelNodeCompiler.java         |   1 -
 .../backends/trident/TestCompilerUtils.java     | 208 ++++++++
 .../backends/trident/TestPlanCompiler.java      |  50 +-
 .../trident/AbstractTridentProcessor.java       |  12 +-
 .../trident/functions/EvaluationCalc.java       | 106 ++++
 .../trident/functions/EvaluationFilter.java     |   5 +
 .../trident/functions/EvaluationFunction.java   |  21 +-
 external/storm-cassandra/pom.xml                |   2 +-
 .../query/impl/PreparedStatementBinder.java     |  68 +--
 .../query/impl/RoutingKeyGenerator.java         |   7 +-
 external/storm-kafka-client/README.md           | 192 +-------
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  | 223 +++++++++
 .../FieldNameBasedTupleToKafkaMapper.java       |  48 ++
 .../kafka/bolt/mapper/TupleToKafkaMapper.java   |  32 ++
 .../bolt/selector/DefaultTopicSelector.java     |  34 ++
 .../bolt/selector/FieldIndexTopicSelector.java  |  52 ++
 .../bolt/selector/FieldNameTopicSelector.java   |  49 ++
 .../kafka/bolt/selector/KafkaTopicSelector.java |  26 +
 .../kafka/spout/ByTopicRecordTranslator.java    | 146 ++++++
 .../kafka/spout/DefaultRecordTranslator.java    |  42 ++
 .../java/org/apache/storm/kafka/spout/Func.java |  26 +
 .../apache/storm/kafka/spout/KafkaSpout.java    | 182 ++-----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 493 +++++++++++++------
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |   2 +-
 .../storm/kafka/spout/KafkaSpoutStream.java     | 121 -----
 .../storm/kafka/spout/KafkaSpoutStreams.java    |  38 --
 .../spout/KafkaSpoutStreamsNamedTopics.java     | 165 -------
 .../spout/KafkaSpoutStreamsWildcardTopics.java  |  67 ---
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  58 ---
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  32 --
 .../KafkaSpoutTuplesBuilderNamedTopics.java     |  78 ---
 .../KafkaSpoutTuplesBuilderWildcardTopics.java  |  36 --
 .../apache/storm/kafka/spout/KafkaTuple.java    |  47 ++
 .../spout/ManualPartitionNamedSubscription.java |  78 +++
 .../ManualPartitionPatternSubscription.java     |  76 +++
 .../storm/kafka/spout/ManualPartitioner.java    |  40 ++
 .../storm/kafka/spout/NamedSubscription.java    |  61 +++
 .../storm/kafka/spout/PatternSubscription.java  |  54 ++
 .../storm/kafka/spout/RecordTranslator.java     |  55 +++
 .../spout/RoundRobinManualPartitioner.java      |  50 ++
 .../kafka/spout/SimpleRecordTranslator.java     |  58 +++
 .../apache/storm/kafka/spout/Subscription.java  |  53 ++
 .../kafka/spout/TopicPartitionComparator.java   |  49 ++
 .../storm/kafka/spout/internal/Timer.java       |  74 +++
 .../spout/trident/KafkaTridentSpoutEmitter.java |  60 ++-
 .../spout/trident/KafkaTridentSpoutManager.java |  58 +--
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  12 +-
 .../storm/kafka/trident/TridentKafkaState.java  | 115 +++++
 .../kafka/trident/TridentKafkaStateFactory.java |  63 +++
 .../FieldNameBasedTupleToKafkaMapper.java       |  41 ++
 .../mapper/TridentTupleToKafkaMapper.java       |  28 ++
 .../trident/selector/DefaultTopicSelector.java  |  34 ++
 .../trident/selector/KafkaTopicSelector.java    |  26 +
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |  91 ++++
 .../spout/ByTopicRecordTranslatorTest.java      |  72 +++
 .../spout/DefaultRecordTranslatorTest.java      |  37 ++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  40 ++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  35 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   2 +-
 .../SingleTopicKafkaSpoutConfiguration.java     |  61 +--
 .../builders/TopicKeyValueTupleBuilder.java     |  40 --
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  70 +--
 .../KafkaSpoutTopologyMainWildcardTopics.java   |  40 +-
 .../spout/test/TopicTest2TupleBuilder.java      |  40 --
 .../test/TopicsTest0Test1TupleBuilder.java      |  42 --
 .../kafka/DynamicPartitionConnections.java      |   2 +-
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |   2 +-
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  |   4 +-
 .../apache/storm/kafka/PartitionManager.java    |   2 +-
 .../apache/storm/kafka/StaticCoordinator.java   |   4 +-
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |   2 +
 .../storm/kafka/DynamicBrokersReaderTest.java   |   6 +-
 .../apache/storm/kafka/TridentKafkaTest.java    |   2 +-
 .../apache/storm/kafka/ZkCoordinatorTest.java   |   2 +-
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |   4 +-
 .../storm/opentsdb/bolt/OpenTsdbBolt.java       |  12 +-
 .../storm/opentsdb/client/OpenTsdbClient.java   |   2 +-
 .../storm/opentsdb/trident/OpenTsdbState.java   |   8 +-
 .../opentsdb/trident/OpenTsdbStateFactory.java  |   5 +-
 log4j2/cluster.xml                              |   2 +-
 log4j2/worker.xml                               |   2 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |   4 +-
 .../org/apache/storm/daemon/GrouperFactory.java |   3 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |   3 +-
 .../storm/daemon/supervisor/Supervisor.java     |   2 +-
 .../jvm/org/apache/storm/executor/Executor.java |   2 +-
 .../storm/executor/error/ReportError.java       |   2 +-
 .../metric/ClusterMetricsConsumerExecutor.java  |  31 +-
 .../jvm/org/apache/storm/spout/ShellSpout.java  |  31 +-
 .../src/jvm/org/apache/storm/tuple/Fields.java  |  18 +-
 .../jvm/org/apache/storm/utils/TupleUtils.java  |   7 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  19 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   | 119 ++++-
 .../clj/org/apache/storm/logviewer_test.clj     |   2 +-
 .../clj/org/apache/storm/messaging_test.clj     |  61 ---
 .../jvm/org/apache/storm/MessagingTest.java     |  68 +++
 .../ClusterMetricsConsumerExecutorTest.java     | 133 +++++
 153 files changed, 5616 insertions(+), 2433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ad2be678/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/ad2be678/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 2818c9a,db07fda..175bd44
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@@ -100,62 -91,108 +91,106 @@@ public class KafkaSpoutConfig<K, V> imp
          }
          return kafkaProps;
      }
- 
+     
      public static class Builder<K,V> {
          private final Map<String, Object> kafkaProps;
-         private SerializableDeserializer<K> keyDeserializer;
-         private SerializableDeserializer<V> valueDeserializer;
+         private Subscription subscription;
+         private final SerializableDeserializer<K> keyDes;
+         private final Class<? extends Deserializer<K>> keyDesClazz;
+         private final SerializableDeserializer<V> valueDes;
+         private final Class<? extends Deserializer<V>> valueDesClazz;
+         private RecordTranslator<K, V> translator;
          private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
          private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
 -        private int maxRetries = DEFAULT_MAX_RETRIES;
          private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-         private final KafkaSpoutStreams kafkaSpoutStreams;
          private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-         private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
-         private final KafkaSpoutRetryService retryService;
+         private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+         private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+         
+         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
+             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+         }
+         
+         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+         }
+         
+         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+             this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+         }
+         
+         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+             this(bootstrapServers, keyDes, null, valDes, null, subscription);
+         }
+         
+         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) {
+             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+         }
+         
+         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+             this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+         }
+         
+         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+             this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+         }
+         
+         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+             this(bootstrapServers, null, keyDes, null, valDes, subscription);
+         }
+         
+         private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                 SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
+             kafkaProps = new HashMap<>();
+             if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+                 throw new IllegalArgumentException("bootstrap servers cannot be null");
+             }
+             kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+             this.keyDes = keyDes;
+             this.keyDesClazz = keyDesClazz;
+             this.valueDes = valDes;
+             this.valueDesClazz = valDesClazz;
+             this.subscription = subscription;
+             this.translator = new DefaultRecordTranslator<K,V>();
+         }
+ 
+         private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                 SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+             this.kafkaProps = new HashMap<>(builder.kafkaProps);
+             this.subscription = builder.subscription;
+             this.pollTimeoutMs = builder.pollTimeoutMs;
+             this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
 -            this.maxRetries = builder.maxRetries;
+             this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+             this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+             //this could result in a lot of class case exceptions at runtime,
+             // but because some translators will work no matter what the generics
+             // are I thought it best not to force someone to reset the translator
+             // when they change the key/value types.
+             this.translator = (RecordTranslator<K, V>) builder.translator;
+             this.retryService = builder.retryService;
+             this.keyDes = keyDes;
+             this.keyDesClazz = keyDesClazz;
+             this.valueDes = valueDes;
+             this.valueDesClazz = valueDesClazz;
+         }
  
          /**
-          * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
-          * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
-          * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-          *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+          * Specifying this key deserializer overrides the property key.deserializer. If you have
+          * set a custom RecordTranslator before calling this it may result in class cast
+          * exceptions at runtime.
           */
-         public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
-                        KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
-             this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
-                     new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-                             DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+         public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+             return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
          }
- 
-         /***
-          * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
-          * The optional configuration can be specified using the set methods of this builder
-          * @param kafkaProps    properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
-          * @param kafkaSpoutStreams    streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
-          * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
-          * @param retryService  logic that manages the retrial of failed tuples
+         
+         /**
+          * Specify a class that can be instantiated to create a key.deserializer
+          * This is the same as setting key.deserializer, but overrides it. If you have
+          * set a custom RecordTranslator before calling this it may result in class cast
+          * exceptions at runtime.
           */
-         public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
-                        KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
-             if (kafkaProps == null || kafkaProps.isEmpty()) {
-                 throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
-             }
- 
-             if (kafkaSpoutStreams == null)  {
-                 throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
-             }
- 
-             if (tuplesBuilder == null) {
-                 throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
-             }
- 
-             if (retryService == null) {
-                 throw new IllegalArgumentException("Must specify at implementation of retry service");
-             }
- 
-             this.kafkaProps = kafkaProps;
-             this.kafkaSpoutStreams = kafkaSpoutStreams;
-             this.tuplesBuilder = tuplesBuilder;
-             this.retryService = retryService;
+         public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+             return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
          }
  
          /**
@@@ -218,6 -423,41 +408,39 @@@
          }
      }
  
+     // Kafka consumer configuration
+     private final Map<String, Object> kafkaProps;
+     private final Subscription subscription;
+     private final SerializableDeserializer<K> keyDes;
+     private final Class<? extends Deserializer<K>> keyDesClazz;
+     private final SerializableDeserializer<V> valueDes;
+     private final Class<? extends Deserializer<V>> valueDesClazz;
+     private final long pollTimeoutMs;
+ 
+     // Kafka spout configuration
+     private final RecordTranslator<K, V> translator;
+     private final long offsetCommitPeriodMs;
 -    private final int maxRetries;
+     private final int maxUncommittedOffsets;
+     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+     private final KafkaSpoutRetryService retryService;
+     private final long partitionRefreshPeriodMs;
+ 
+     private KafkaSpoutConfig(Builder<K,V> builder) {
+         this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+         this.subscription = builder.subscription;
+         this.translator = builder.translator;
+         this.pollTimeoutMs = builder.pollTimeoutMs;
+         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
 -        this.maxRetries = builder.maxRetries;
+         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+         this.retryService = builder.retryService;
+         this.keyDes = builder.keyDes;
+         this.keyDesClazz = builder.keyDesClazz;
+         this.valueDes = builder.valueDes;
+         this.valueDesClazz = builder.valueDesClazz;
+         this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+     }
+ 
      public Map<String, Object> getKafkaProps() {
          return kafkaProps;
      }
@@@ -244,29 -506,13 +489,9 @@@
      }
  
      public String getConsumerGroupId() {
-         return (String) kafkaProps.get(Consumer.GROUP_ID);
-     }
- 
-     /**
-      * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream},
-      * or null if this stream is associated with a wildcard pattern topic
-      */
-     public List<String> getSubscribedTopics() {
-         return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ?
-             new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) :
-             null;
-     }
- 
-     /**
-      * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
-      * if this stream is associated with a specific named topic
-      */
-     public Pattern getTopicWildcardPattern() {
-         return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ?
-                 ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() :
-                 null;
+         return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
      }
  
 -    public int getMaxTupleRetries() {
 -        return maxRetries;
 -    }
 -
      public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
          return firstPollOffsetStrategy;
      }
@@@ -291,17 -533,16 +512,15 @@@
      public String toString() {
          return "KafkaSpoutConfig{" +
                  "kafkaProps=" + kafkaProps +
-                 ", keyDeserializer=" + keyDeserializer +
-                 ", valueDeserializer=" + valueDeserializer +
+                 ", key=" + getKeyDeserializer() +
+                 ", value=" + getValueDeserializer() +
                  ", pollTimeoutMs=" + pollTimeoutMs +
                  ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
 -                ", maxRetries=" + maxRetries +
                  ", maxUncommittedOffsets=" + maxUncommittedOffsets +
                  ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
-                 ", kafkaSpoutStreams=" + kafkaSpoutStreams +
-                 ", tuplesBuilder=" + tuplesBuilder +
+                 ", subscription=" + subscription +
+                 ", translator=" + translator +
                  ", retryService=" + retryService +
-                 ", topics=" + getSubscribedTopics() +
-                 ", topicWildcardPattern=" + getTopicWildcardPattern() +
                  '}';
      }
  }


[3/4] storm git commit: Merge branch 'STORM-2014' of https://github.com/srdo/storm into STORM-2014-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2014' of https://github.com/srdo/storm into STORM-2014-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/99be1728
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/99be1728
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/99be1728

Branch: refs/heads/master
Commit: 99be17282e73314f30de3d80abcefc2de8f226a8
Parents: e8c3900 ad2be67
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 11:41:54 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:41:54 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 13 ++++-----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 24 +---------------
 .../KafkaSpoutRetryExponentialBackoff.java      | 30 ++++++++++++--------
 .../kafka/spout/KafkaSpoutRetryService.java     | 12 ++++++--
 4 files changed, 34 insertions(+), 45 deletions(-)
----------------------------------------------------------------------