You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/03 02:52:13 UTC
[1/4] storm git commit: STORM-2014: Make KafkaSpout delegate maxRetry
check to RetryService
Repository: storm
Updated Branches:
refs/heads/master e8c390059 -> 38ee403bf
STORM-2014: Make KafkaSpout delegate maxRetry check to RetryService
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0aa0e857
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0aa0e857
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0aa0e857
Branch: refs/heads/master
Commit: 0aa0e8579723a76f53fe7e0ae71dfd1722465c0e
Parents: e3b2f96
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Aug 2 19:17:02 2016 +0200
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Thu Jan 12 21:27:05 2017 +0100
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 13 ++++-----
.../storm/kafka/spout/KafkaSpoutConfig.java | 23 +--------------
.../KafkaSpoutRetryExponentialBackoff.java | 30 ++++++++++++--------
.../kafka/spout/KafkaSpoutRetryService.java | 12 ++++++--
4 files changed, 34 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d405c4d..60ca0b9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -72,7 +72,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Bookkeeping
- private transient int maxRetries; // Max number of times a tuple is retried
private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
private transient Timer commitTimer; // timer == null for auto commit mode
@@ -105,7 +104,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Spout internals
this.collector = collector;
- maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
numUncommittedOffsets = 0;
// Offset management
@@ -381,11 +379,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
- if (msgId.numFails() < maxRetries) {
- emitted.remove(msgId);
- msgId.incrementNumFails();
- retryService.schedule(msgId);
- } else { // limit to max number of retries
+ emitted.remove(msgId);
+ msgId.incrementNumFails();
+ if (!retryService.schedule(msgId)) {
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
ack(msgId);
}
@@ -523,6 +519,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
/**
+ * An offset is only committed when all records with lower offset have
+ * been acked. This guarantees that all offsets smaller than the
+ * committedOffset have been delivered.
* @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
*/
public OffsetAndMetadata findNextCommitOffset() {
http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 8aa525b..2818c9a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -74,7 +74,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
// Kafka spout configuration
private final long offsetCommitPeriodMs;
- private final int maxRetries;
private final int maxUncommittedOffsets;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
private final KafkaSpoutStreams kafkaSpoutStreams;
@@ -87,7 +86,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.valueDeserializer = builder.valueDeserializer;
this.pollTimeoutMs = builder.pollTimeoutMs;
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.maxRetries = builder.maxRetries;
this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
@@ -109,7 +107,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private SerializableDeserializer<V> valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
- private int maxRetries = DEFAULT_MAX_RETRIES;
private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
private final KafkaSpoutStreams kafkaSpoutStreams;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
@@ -194,20 +191,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}
-
- /**
- * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
- * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
- * all the previously polled records.
- * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
- * polled records in favor of processing more records.
- * @param maxRetries max number of retrials
- */
- public Builder<K,V> setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
- return this;
- }
-
+
/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
@@ -283,10 +267,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
null;
}
- public int getMaxTupleRetries() {
- return maxRetries;
- }
-
public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
return firstPollOffsetStrategy;
}
@@ -315,7 +295,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
", valueDeserializer=" + valueDeserializer +
", pollTimeoutMs=" + pollTimeoutMs +
", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
- ", maxRetries=" + maxRetries +
", maxUncommittedOffsets=" + maxUncommittedOffsets +
", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
", kafkaSpoutStreams=" + kafkaSpoutStreams +
http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index f59367d..2c8d7e4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -40,25 +40,26 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
- private TimeInterval initialDelay;
- private TimeInterval delayPeriod;
- private TimeInterval maxDelay;
- private int maxRetries;
+ private final TimeInterval initialDelay;
+ private final TimeInterval delayPeriod;
+ private final TimeInterval maxDelay;
+ private final int maxRetries;
- private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
- private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
+ private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+ private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
/**
* Comparator ordering by timestamp
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+ @Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
}
}
private class RetrySchedule {
- private KafkaSpoutMessageId msgId;
+ private final KafkaSpoutMessageId msgId;
private long nextRetryTimeNanos;
public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
@@ -94,9 +95,9 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
public static class TimeInterval implements Serializable {
- private long lengthNanos;
- private long length;
- private TimeUnit timeUnit;
+ private final long lengthNanos;
+ private final long length;
+ private final TimeUnit timeUnit;
/**
* @param length length of the time interval in the units specified by {@link TimeUnit}
@@ -144,7 +145,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ * nextRetry = Min(nextRetry, currentTime + maxDelay).
+ *
+ * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
+ * polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -239,9 +243,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
@Override
- public void schedule(KafkaSpoutMessageId msgId) {
+ public boolean schedule(KafkaSpoutMessageId msgId) {
if (msgId.numFails() > maxRetries) {
LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+ return false;
} else {
if (toRetryMsgs.contains(msgId)) {
for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
@@ -257,6 +262,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
toRetryMsgs.add(msgId);
LOG.debug("Scheduled. {}", retrySchedule);
LOG.trace("Current state {}", retrySchedules);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5aab167..bf17a5a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -29,14 +29,18 @@ import java.util.Set;
*/
public interface KafkaSpoutRetryService extends Serializable {
/**
- * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+ * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or
+ * updates retry time if it has already been scheduled. It may also indicate
+ * that the message should not be retried, in which case the message will not be scheduled.
* @param msgId message to schedule for retrial
+ * @return true if the message will be retried, false otherwise
*/
- void schedule(KafkaSpoutMessageId msgId);
+ boolean schedule(KafkaSpoutMessageId msgId);
/**
* Removes a message from the list of messages scheduled for retrial
* @param msgId message to remove from retrial
+ * @return true if the message was scheduled for retrial, false otherwise
*/
boolean remove(KafkaSpoutMessageId msgId);
@@ -56,8 +60,9 @@ public interface KafkaSpoutRetryService extends Serializable {
Set<TopicPartition> retriableTopicPartitions();
/**
- * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
* i.e is scheduled and has retry time that is less than current time.
+ * @param msgId message to check for readiness
* @return true if message is ready to be retried, false otherwise
*/
boolean isReady(KafkaSpoutMessageId msgId);
@@ -65,6 +70,7 @@ public interface KafkaSpoutRetryService extends Serializable {
/**
* Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
* The message may or may not be ready to be retried yet.
+ * @param msgId message to check for scheduling status
* @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
* Returns false is this message is not scheduled for retrial
*/
[4/4] storm git commit: STORM-2014: CHANGELOG
Posted by ka...@apache.org.
STORM-2014: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/38ee403b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/38ee403b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/38ee403b
Branch: refs/heads/master
Commit: 38ee403bf4a537ef276221033901690f099479c0
Parents: 99be172
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 11:52:02 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:52:02 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/38ee403b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e5a6fba..3739114 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -196,6 +196,7 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.0
+ * STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
* STORM-2324: Fix deployment failure if resources directory is missing in topology jar
* STORM-1443: [Storm SQL] Support customizing parallelism in StormSQL
* STORM-2148: [Storm SQL] Trident mode: back to code generate and compile Trident topology
[2/4] storm git commit: Merge branch 'master' of
https://github.com/apache/storm into STORM-2014
Posted by ka...@apache.org.
Merge branch 'master' of https://github.com/apache/storm into STORM-2014
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ad2be678
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ad2be678
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ad2be678
Branch: refs/heads/master
Commit: ad2be678831b3b060229fd936e3908110162b7ac
Parents: 0aa0e85 251cb88
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Wed Feb 1 22:28:00 2017 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Feb 1 22:28:00 2017 +0100
----------------------------------------------------------------------
CHANGELOG.md | 25 +-
conf/defaults.yaml | 2 +-
docs/Multilang-protocol.md | 12 +-
docs/storm-kafka-client.md | 293 ++++++++---
docs/storm-sql-reference.md | 2 +
.../TridentKafkaClientWordCountNamedTopics.java | 69 +--
...identKafkaClientWordCountWildcardTopics.java | 31 +-
external/flux/README.md | 19 +
.../java/org/apache/storm/flux/FluxBuilder.java | 17 +-
.../storm/flux/model/BeanListReference.java | 37 ++
.../storm/flux/model/ConfigMethodDef.java | 5 +
.../org/apache/storm/flux/model/ObjectDef.java | 7 +-
.../java/org/apache/storm/flux/TCKTest.java | 1 +
.../org/apache/storm/flux/test/TestBolt.java | 43 ++
.../resources/configs/config-methods-test.yaml | 22 +-
.../jvm/org/apache/storm/sql/StormSqlImpl.java | 53 +-
.../apache/storm/sql/compiler/CompilerUtil.java | 4 +-
.../sql/compiler/PostOrderRelNodeVisitor.java | 132 -----
.../RexNodeToBlockStatementCompiler.java | 9 +-
.../sql/compiler/StormSqlTypeFactoryImpl.java | 51 ++
.../standalone/PostOrderRelNodeVisitor.java | 132 +++++
.../backends/standalone/RelNodeCompiler.java | 3 -
.../compiler/backends/trident/PlanCompiler.java | 74 ---
.../trident/TridentLogicalPlanCompiler.java | 171 -------
.../sql/planner/StormRelDataTypeSystem.java | 37 ++
.../apache/storm/sql/planner/StormRelUtils.java | 62 +++
.../planner/UnsupportedOperatorsVisitor.java | 24 +
.../storm/sql/planner/rel/StormCalcRelBase.java | 32 ++
.../sql/planner/rel/StormFilterRelBase.java | 32 ++
.../storm/sql/planner/rel/StormJoinRelBase.java | 36 ++
.../sql/planner/rel/StormProjectRelBase.java | 35 ++
.../storm/sql/planner/rel/StormRelNode.java | 24 +
.../planner/rel/StormStreamInsertRelBase.java | 36 ++
.../sql/planner/rel/StormStreamScanRelBase.java | 34 ++
.../storm/sql/planner/trident/QueryPlanner.java | 149 ++++++
.../sql/planner/trident/TridentPlanCreator.java | 90 ++++
.../planner/trident/TridentStormRuleSets.java | 110 +++++
.../sql/planner/trident/rel/TridentCalcRel.java | 96 ++++
.../planner/trident/rel/TridentFilterRel.java | 60 +++
.../trident/rel/TridentLogicalConvention.java | 67 +++
.../planner/trident/rel/TridentProjectRel.java | 70 +++
.../sql/planner/trident/rel/TridentRel.java | 26 +
.../trident/rel/TridentStreamInsertRel.java | 76 +++
.../trident/rel/TridentStreamScanRel.java | 51 ++
.../trident/rules/TridentAggregateRule.java | 39 ++
.../planner/trident/rules/TridentCalcRule.java | 45 ++
.../trident/rules/TridentFilterRule.java | 45 ++
.../planner/trident/rules/TridentJoinRule.java | 37 ++
.../trident/rules/TridentModifyRule.java | 71 +++
.../trident/rules/TridentProjectRule.java | 45 ++
.../planner/trident/rules/TridentScanRule.java | 50 ++
.../test/org/apache/storm/sql/TestStormSql.java | 18 +
.../storm/sql/compiler/TestCompilerUtils.java | 260 ----------
.../storm/sql/compiler/TestExprSemantic.java | 4 +-
.../backends/standalone/TestCompilerUtils.java | 183 +++++++
.../backends/standalone/TestPlanCompiler.java | 1 -
.../standalone/TestRelNodeCompiler.java | 1 -
.../backends/trident/TestCompilerUtils.java | 208 ++++++++
.../backends/trident/TestPlanCompiler.java | 50 +-
.../trident/AbstractTridentProcessor.java | 12 +-
.../trident/functions/EvaluationCalc.java | 106 ++++
.../trident/functions/EvaluationFilter.java | 5 +
.../trident/functions/EvaluationFunction.java | 21 +-
external/storm-cassandra/pom.xml | 2 +-
.../query/impl/PreparedStatementBinder.java | 68 +--
.../query/impl/RoutingKeyGenerator.java | 7 +-
external/storm-kafka-client/README.md | 192 +-------
.../org/apache/storm/kafka/bolt/KafkaBolt.java | 223 +++++++++
.../FieldNameBasedTupleToKafkaMapper.java | 48 ++
.../kafka/bolt/mapper/TupleToKafkaMapper.java | 32 ++
.../bolt/selector/DefaultTopicSelector.java | 34 ++
.../bolt/selector/FieldIndexTopicSelector.java | 52 ++
.../bolt/selector/FieldNameTopicSelector.java | 49 ++
.../kafka/bolt/selector/KafkaTopicSelector.java | 26 +
.../kafka/spout/ByTopicRecordTranslator.java | 146 ++++++
.../kafka/spout/DefaultRecordTranslator.java | 42 ++
.../java/org/apache/storm/kafka/spout/Func.java | 26 +
.../apache/storm/kafka/spout/KafkaSpout.java | 182 ++-----
.../storm/kafka/spout/KafkaSpoutConfig.java | 493 +++++++++++++------
.../storm/kafka/spout/KafkaSpoutMessageId.java | 2 +-
.../storm/kafka/spout/KafkaSpoutStream.java | 121 -----
.../storm/kafka/spout/KafkaSpoutStreams.java | 38 --
.../spout/KafkaSpoutStreamsNamedTopics.java | 165 -------
.../spout/KafkaSpoutStreamsWildcardTopics.java | 67 ---
.../kafka/spout/KafkaSpoutTupleBuilder.java | 58 ---
.../kafka/spout/KafkaSpoutTuplesBuilder.java | 32 --
.../KafkaSpoutTuplesBuilderNamedTopics.java | 78 ---
.../KafkaSpoutTuplesBuilderWildcardTopics.java | 36 --
.../apache/storm/kafka/spout/KafkaTuple.java | 47 ++
.../spout/ManualPartitionNamedSubscription.java | 78 +++
.../ManualPartitionPatternSubscription.java | 76 +++
.../storm/kafka/spout/ManualPartitioner.java | 40 ++
.../storm/kafka/spout/NamedSubscription.java | 61 +++
.../storm/kafka/spout/PatternSubscription.java | 54 ++
.../storm/kafka/spout/RecordTranslator.java | 55 +++
.../spout/RoundRobinManualPartitioner.java | 50 ++
.../kafka/spout/SimpleRecordTranslator.java | 58 +++
.../apache/storm/kafka/spout/Subscription.java | 53 ++
.../kafka/spout/TopicPartitionComparator.java | 49 ++
.../storm/kafka/spout/internal/Timer.java | 74 +++
.../spout/trident/KafkaTridentSpoutEmitter.java | 60 ++-
.../spout/trident/KafkaTridentSpoutManager.java | 58 +--
.../spout/trident/KafkaTridentSpoutOpaque.java | 12 +-
.../storm/kafka/trident/TridentKafkaState.java | 115 +++++
.../kafka/trident/TridentKafkaStateFactory.java | 63 +++
.../FieldNameBasedTupleToKafkaMapper.java | 41 ++
.../mapper/TridentTupleToKafkaMapper.java | 28 ++
.../trident/selector/DefaultTopicSelector.java | 34 ++
.../trident/selector/KafkaTopicSelector.java | 26 +
.../apache/storm/kafka/bolt/KafkaBoltTest.java | 91 ++++
.../spout/ByTopicRecordTranslatorTest.java | 72 +++
.../spout/DefaultRecordTranslatorTest.java | 37 ++
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 40 ++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 35 +-
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 2 +-
.../SingleTopicKafkaSpoutConfiguration.java | 61 +--
.../builders/TopicKeyValueTupleBuilder.java | 40 --
.../test/KafkaSpoutTopologyMainNamedTopics.java | 70 +--
.../KafkaSpoutTopologyMainWildcardTopics.java | 40 +-
.../spout/test/TopicTest2TupleBuilder.java | 40 --
.../test/TopicsTest0Test1TupleBuilder.java | 42 --
.../kafka/DynamicPartitionConnections.java | 2 +-
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 2 +-
.../jvm/org/apache/storm/kafka/KafkaUtils.java | 4 +-
.../apache/storm/kafka/PartitionManager.java | 2 +-
.../apache/storm/kafka/StaticCoordinator.java | 4 +-
.../org/apache/storm/kafka/bolt/KafkaBolt.java | 2 +
.../storm/kafka/DynamicBrokersReaderTest.java | 6 +-
.../apache/storm/kafka/TridentKafkaTest.java | 2 +-
.../apache/storm/kafka/ZkCoordinatorTest.java | 2 +-
.../apache/storm/kafka/bolt/KafkaBoltTest.java | 4 +-
.../storm/opentsdb/bolt/OpenTsdbBolt.java | 12 +-
.../storm/opentsdb/client/OpenTsdbClient.java | 2 +-
.../storm/opentsdb/trident/OpenTsdbState.java | 8 +-
.../opentsdb/trident/OpenTsdbStateFactory.java | 5 +-
log4j2/cluster.xml | 2 +-
log4j2/worker.xml | 2 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 4 +-
.../org/apache/storm/daemon/GrouperFactory.java | 3 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 3 +-
.../storm/daemon/supervisor/Supervisor.java | 2 +-
.../jvm/org/apache/storm/executor/Executor.java | 2 +-
.../storm/executor/error/ReportError.java | 2 +-
.../metric/ClusterMetricsConsumerExecutor.java | 31 +-
.../jvm/org/apache/storm/spout/ShellSpout.java | 31 +-
.../src/jvm/org/apache/storm/tuple/Fields.java | 18 +-
.../jvm/org/apache/storm/utils/TupleUtils.java | 7 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 19 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 119 ++++-
.../clj/org/apache/storm/logviewer_test.clj | 2 +-
.../clj/org/apache/storm/messaging_test.clj | 61 ---
.../jvm/org/apache/storm/MessagingTest.java | 68 +++
.../ClusterMetricsConsumerExecutorTest.java | 133 +++++
153 files changed, 5616 insertions(+), 2433 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ad2be678/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ad2be678/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 2818c9a,db07fda..175bd44
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@@ -100,62 -91,108 +91,106 @@@ public class KafkaSpoutConfig<K, V> imp
}
return kafkaProps;
}
-
+
public static class Builder<K,V> {
private final Map<String, Object> kafkaProps;
- private SerializableDeserializer<K> keyDeserializer;
- private SerializableDeserializer<V> valueDeserializer;
+ private Subscription subscription;
+ private final SerializableDeserializer<K> keyDes;
+ private final Class<? extends Deserializer<K>> keyDesClazz;
+ private final SerializableDeserializer<V> valueDes;
+ private final Class<? extends Deserializer<V>> valueDesClazz;
+ private RecordTranslator<K, V> translator;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
- private int maxRetries = DEFAULT_MAX_RETRIES;
private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
- private final KafkaSpoutStreams kafkaSpoutStreams;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
- private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
- private final KafkaSpoutRetryService retryService;
+ private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+ private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
+ this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+ }
+
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+ this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+ }
+
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+ this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+ }
+
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+ this(bootstrapServers, keyDes, null, valDes, null, subscription);
+ }
+
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) {
+ this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+ }
+
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+ this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+ }
+
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+ this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+ }
+
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+ this(bootstrapServers, null, keyDes, null, valDes, subscription);
+ }
+
+ private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
+ kafkaProps = new HashMap<>();
+ if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+ throw new IllegalArgumentException("bootstrap servers cannot be null");
+ }
+ kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ this.keyDes = keyDes;
+ this.keyDesClazz = keyDesClazz;
+ this.valueDes = valDes;
+ this.valueDesClazz = valDesClazz;
+ this.subscription = subscription;
+ this.translator = new DefaultRecordTranslator<K,V>();
+ }
+
+ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+ this.kafkaProps = new HashMap<>(builder.kafkaProps);
+ this.subscription = builder.subscription;
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.maxRetries = builder.maxRetries;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ //this could result in a lot of class case exceptions at runtime,
+ // but because some translators will work no matter what the generics
+ // are I thought it best not to force someone to reset the translator
+ // when they change the key/value types.
+ this.translator = (RecordTranslator<K, V>) builder.translator;
+ this.retryService = builder.retryService;
+ this.keyDes = keyDes;
+ this.keyDesClazz = keyDesClazz;
+ this.valueDes = valueDes;
+ this.valueDesClazz = valueDesClazz;
+ }
/**
- * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
- * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
- * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
- * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+ * Specifying this key deserializer overrides the property key.deserializer. If you have
+ * set a custom RecordTranslator before calling this it may result in class cast
+ * exceptions at runtime.
*/
- public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
- KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
- this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
- DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+ public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+ return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
}
-
- /***
- * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
- * The optional configuration can be specified using the set methods of this builder
- * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
- * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
- * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
- * @param retryService logic that manages the retrial of failed tuples
+
+ /**
+ * Specify a class that can be instantiated to create a key.deserializer
+ * This is the same as setting key.deserializer, but overrides it. If you have
+ * set a custom RecordTranslator before calling this it may result in class cast
+ * exceptions at runtime.
*/
- public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
- KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
- if (kafkaProps == null || kafkaProps.isEmpty()) {
- throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
- }
-
- if (kafkaSpoutStreams == null) {
- throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
- }
-
- if (tuplesBuilder == null) {
- throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
- }
-
- if (retryService == null) {
- throw new IllegalArgumentException("Must specify at implementation of retry service");
- }
-
- this.kafkaProps = kafkaProps;
- this.kafkaSpoutStreams = kafkaSpoutStreams;
- this.tuplesBuilder = tuplesBuilder;
- this.retryService = retryService;
+ public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+ return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
}
/**
@@@ -218,6 -423,41 +408,39 @@@
}
}
+ // Kafka consumer configuration
+ private final Map<String, Object> kafkaProps;
+ private final Subscription subscription;
+ private final SerializableDeserializer<K> keyDes;
+ private final Class<? extends Deserializer<K>> keyDesClazz;
+ private final SerializableDeserializer<V> valueDes;
+ private final Class<? extends Deserializer<V>> valueDesClazz;
+ private final long pollTimeoutMs;
+
+ // Kafka spout configuration
+ private final RecordTranslator<K, V> translator;
+ private final long offsetCommitPeriodMs;
- private final int maxRetries;
+ private final int maxUncommittedOffsets;
+ private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final KafkaSpoutRetryService retryService;
+ private final long partitionRefreshPeriodMs;
+
+ private KafkaSpoutConfig(Builder<K,V> builder) {
+ this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+ this.subscription = builder.subscription;
+ this.translator = builder.translator;
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.maxRetries = builder.maxRetries;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ this.retryService = builder.retryService;
+ this.keyDes = builder.keyDes;
+ this.keyDesClazz = builder.keyDesClazz;
+ this.valueDes = builder.valueDes;
+ this.valueDesClazz = builder.valueDesClazz;
+ this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+ }
+
public Map<String, Object> getKafkaProps() {
return kafkaProps;
}
@@@ -244,29 -506,13 +489,9 @@@
}
public String getConsumerGroupId() {
- return (String) kafkaProps.get(Consumer.GROUP_ID);
- }
-
- /**
- * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream},
- * or null if this stream is associated with a wildcard pattern topic
- */
- public List<String> getSubscribedTopics() {
- return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ?
- new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) :
- null;
- }
-
- /**
- * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
- * if this stream is associated with a specific named topic
- */
- public Pattern getTopicWildcardPattern() {
- return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ?
- ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() :
- null;
+ return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
}
- public int getMaxTupleRetries() {
- return maxRetries;
- }
-
public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
return firstPollOffsetStrategy;
}
@@@ -291,17 -533,16 +512,15 @@@
public String toString() {
return "KafkaSpoutConfig{" +
"kafkaProps=" + kafkaProps +
- ", keyDeserializer=" + keyDeserializer +
- ", valueDeserializer=" + valueDeserializer +
+ ", key=" + getKeyDeserializer() +
+ ", value=" + getValueDeserializer() +
", pollTimeoutMs=" + pollTimeoutMs +
", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
- ", maxRetries=" + maxRetries +
", maxUncommittedOffsets=" + maxUncommittedOffsets +
", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
- ", kafkaSpoutStreams=" + kafkaSpoutStreams +
- ", tuplesBuilder=" + tuplesBuilder +
+ ", subscription=" + subscription +
+ ", translator=" + translator +
", retryService=" + retryService +
- ", topics=" + getSubscribedTopics() +
- ", topicWildcardPattern=" + getTopicWildcardPattern() +
'}';
}
}
[3/4] storm git commit: Merge branch 'STORM-2014' of
https://github.com/srdo/storm into STORM-2014-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2014' of https://github.com/srdo/storm into STORM-2014-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/99be1728
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/99be1728
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/99be1728
Branch: refs/heads/master
Commit: 99be17282e73314f30de3d80abcefc2de8f226a8
Parents: e8c3900 ad2be67
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 11:41:54 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:41:54 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 13 ++++-----
.../storm/kafka/spout/KafkaSpoutConfig.java | 24 +---------------
.../KafkaSpoutRetryExponentialBackoff.java | 30 ++++++++++++--------
.../kafka/spout/KafkaSpoutRetryService.java | 12 ++++++--
4 files changed, 34 insertions(+), 45 deletions(-)
----------------------------------------------------------------------