You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:07 UTC
[06/14] storm git commit: STORM-2937: Overwrite storm-kafka-client
1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from
1.x-branch
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9..7aa836c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,232 +18,810 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.Config;
+import org.apache.storm.annotation.InterfaceStability;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
*/
public class KafkaSpoutConfig<K, V> implements Serializable {
- public static final long DEFAULT_POLL_TIMEOUT_MS = 200; // 200ms
- public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000; // 30s
- public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever
- public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000; // 10,000,000 records => 80MBs of memory footprint in the worst case
-
- // Kafka property names
- public interface Consumer {
- String GROUP_ID = "group.id";
- String BOOTSTRAP_SERVERS = "bootstrap.servers";
- String ENABLE_AUTO_COMMIT = "enable.auto.commit";
- String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
- String KEY_DESERIALIZER = "key.deserializer";
- String VALUE_DESERIALIZER = "value.deserializer";
- }
- /**
- * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
- * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
- * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
- * <ul>
- * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
- * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
- * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
- * If no offset has been committed, it behaves as EARLIEST.</li>
- * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
- * If no offset has been committed, it behaves as LATEST.</li>
- * </ul>
- * */
- public enum FirstPollOffsetStrategy {
- EARLIEST,
- LATEST,
- UNCOMMITTED_EARLIEST,
- UNCOMMITTED_LATEST }
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class);
+ private static final long serialVersionUID = 141902646130682494L;
+ // 200ms
+ public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+ // 30s
+ public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
+ // Retry forever
+ public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+ // 10,000,000 records => 80MBs of memory footprint in the worst case
+ public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
+ // 2s
+ public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+
+ public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+
+ public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
+ new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+ DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
+ public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE;
+
+ public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
+
+ public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
+
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valueDeserializer;
+ private final Subscription subscription;
private final long pollTimeoutMs;
// Kafka spout configuration
+ private final RecordTranslator<K, V> translator;
private final long offsetCommitPeriodMs;
- private final int maxRetries;
private final int maxUncommittedOffsets;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
- private final KafkaSpoutStreams kafkaSpoutStreams;
- private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
private final KafkaSpoutRetryService retryService;
+ private final KafkaTupleListener tupleListener;
+ private final long partitionRefreshPeriodMs;
+ private final boolean emitNullTuples;
+ private final SerializableDeserializer<K> keyDes;
+ private final Class<? extends Deserializer<K>> keyDesClazz;
+ private final SerializableDeserializer<V> valueDes;
+ private final Class<? extends Deserializer<V>> valueDesClazz;
+ private final ProcessingGuarantee processingGuarantee;
+ private final boolean tupleTrackingEnforced;
+ private final int metricsTimeBucketSizeInSecs;
- private KafkaSpoutConfig(Builder<K,V> builder) {
- this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
- this.keyDeserializer = builder.keyDeserializer;
- this.valueDeserializer = builder.valueDeserializer;
+ /**
+ * Creates a new KafkaSpoutConfig using a Builder.
+ *
+ * @param builder The Builder to construct the KafkaSpoutConfig from
+ */
+ public KafkaSpoutConfig(Builder<K, V> builder) {
+ setKafkaPropsForProcessingGuarantee(builder);
+ this.kafkaProps = builder.kafkaProps;
+ this.subscription = builder.subscription;
+ this.translator = builder.translator;
this.pollTimeoutMs = builder.pollTimeoutMs;
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.maxRetries = builder.maxRetries;
this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
- this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
- this.tuplesBuilder = builder.tuplesBuilder;
this.retryService = builder.retryService;
+ this.tupleListener = builder.tupleListener;
+ this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+ this.emitNullTuples = builder.emitNullTuples;
+ this.keyDes = builder.keyDes;
+ this.keyDesClazz = builder.keyDesClazz;
+ this.valueDes = builder.valueDes;
+ this.valueDesClazz = builder.valueDesClazz;
+ this.processingGuarantee = builder.processingGuarantee;
+ this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
+ this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
}
- private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
- // set defaults for properties not specified
- if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
- kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+ /**
+ * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
+ * By default this parameter is set to UNCOMMITTED_EARLIEST.
+ */
+ public enum FirstPollOffsetStrategy {
+ /**
+ * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only
+ * takes effect on topology deployment
+ */
+ EARLIEST,
+ /**
+ * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This
+ * setting only takes effect on topology deployment
+ */
+ LATEST,
+ /**
+ * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
+ */
+ UNCOMMITTED_EARLIEST,
+ /**
+ * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
+ */
+ UNCOMMITTED_LATEST;
+
+ @Override
+ public String toString() {
+ return "FirstPollOffsetStrategy{" + super.toString() + "}";
}
- return kafkaProps;
}
- public static class Builder<K,V> {
+ /**
+ * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
+ * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
+ * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
+ * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
+ */
+ @InterfaceStability.Unstable
+ public enum ProcessingGuarantee {
+ /**
+ * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
+ * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
+ * interval.
+ */
+ AT_LEAST_ONCE,
+ /**
+ * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
+ * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
+ * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
+ */
+ AT_MOST_ONCE,
+ /**
+ * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
+ * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
+ * spout to control when commits occur. Commits asynchronously on the defined interval.
+ */
+ NO_GUARANTEE,
+ }
+
+ public static class Builder<K, V> {
+
private final Map<String, Object> kafkaProps;
- private Deserializer<K> keyDeserializer;
- private Deserializer<V> valueDeserializer;
+ private final Subscription subscription;
+ private final SerializableDeserializer<K> keyDes;
+ private final Class<? extends Deserializer<K>> keyDesClazz;
+ private final SerializableDeserializer<V> valueDes;
+ private final Class<? extends Deserializer<V>> valueDesClazz;
+ private RecordTranslator<K, V> translator;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
- private int maxRetries = DEFAULT_MAX_RETRIES;
- private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
- private final KafkaSpoutStreams kafkaSpoutStreams;
+ private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
- private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
- private final KafkaSpoutRetryService retryService;
-
- /**
- * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
- * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
- * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
- * DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
- */
- public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
- KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
- this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
- new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
- DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
- }
-
- /***
- * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
- * The optional configuration can be specified using the set methods of this builder
- * @param kafkaProps properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
- * @param kafkaSpoutStreams streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
- * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
- * @param retryService logic that manages the retrial of failed tuples
- */
- public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
- KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
- if (kafkaProps == null || kafkaProps.isEmpty()) {
- throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
- }
+ private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+ private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
+ private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+ private boolean emitNullTuples = false;
+ private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
+ private boolean tupleTrackingEnforced = false;
+ private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
+
+ public Builder(String bootstrapServers, String... topics) {
+ this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+ }
- if (kafkaSpoutStreams == null) {
- throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
- }
+ public Builder(String bootstrapServers, Collection<String> topics) {
+ this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
+ new NamedTopicFilter(new HashSet<String>(topics))));
+ }
- if (tuplesBuilder == null) {
- throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
- }
+ public Builder(String bootstrapServers, Pattern topics) {
+ this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+ }
- if (retryService == null) {
- throw new IllegalArgumentException("Must specify at implementation of retry service");
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with
+ * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+ * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String... topics) {
+ this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with
+ * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+ * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+ this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with
+ * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+ * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+ this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the
+ * deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+ * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+ this(bootstrapServers, keyDes, null, valDes, null, subscription);
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... topics) {
+ this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+ this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+ this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+ }
+
+ /**
+ * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+ */
+ @Deprecated
+ public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+ this(bootstrapServers, null, keyDes, null, valDes, subscription);
+ }
+
+ /**
+ * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers the consumer will use
+ * @param subscription The subscription defining which topics and partitions each spout instance will read.
+ */
+ public Builder(String bootstrapServers, Subscription subscription) {
+ this(bootstrapServers, null, null, null, null, subscription);
+ }
+
+ private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
+
+ this(keyDes, keyDesClazz, valDes, valDesClazz, subscription,
+ new DefaultRecordTranslator<K, V>(), new HashMap<String, Object>());
+
+ if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+ throw new IllegalArgumentException("bootstrap servers cannot be null");
}
+ kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+ setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz);
+ }
+ /**
+ * This constructor will always be called by one of the methods {@code setKey} or {@code setVal}, which implies
+ * that only one of its SerDe parameters will be non null, for which the corresponding Kafka property will be set
+ */
+ @SuppressWarnings("unchecked")
+ private Builder(final Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+
+ this(keyDes, keyDesClazz, valueDes, valueDesClazz, builder.subscription,
+ (RecordTranslator<K, V>) builder.translator, new HashMap<>(builder.kafkaProps));
+
+ this.pollTimeoutMs = builder.pollTimeoutMs;
+ this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+ this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+ this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+ this.retryService = builder.retryService;
+
+ setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz);
+ }
+
+ private Builder(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz,
+ Subscription subscription, RecordTranslator<K, V> translator, Map<String, Object> kafkaProps) {
+ this.keyDes = keyDes;
+ this.keyDesClazz = keyDesClazz;
+ this.valueDes = valueDes;
+ this.valueDesClazz = valueDesClazz;
+ this.subscription = subscription;
+ this.translator = translator;
this.kafkaProps = kafkaProps;
- this.kafkaSpoutStreams = kafkaSpoutStreams;
- this.tuplesBuilder = tuplesBuilder;
- this.retryService = retryService;
+ }
+
+ private void setNonNullSerDeKafkaProp(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+ SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+ if (keyDesClazz != null) {
+ kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
+ }
+ if (keyDes != null) {
+ kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass());
+ }
+ if (valueDesClazz != null) {
+ kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz);
+ }
+ if (valueDes != null) {
+ kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass());
+ }
+ }
+
+ /**
+ * Specifying this key deserializer overrides the property key.deserializer. If you have set a custom RecordTranslator before
+ * calling this it may result in class cast exceptions at runtime.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+ */
+ @Deprecated
+ public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+ return new Builder<>(this, keyDeserializer, null, null, null);
}
/**
- * Specifying this key deserializer overrides the property key.deserializer
+ * Specify a class that can be instantiated to create a key.deserializer This is the same as setting key.deserializer, but overrides
+ * it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
*/
- public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
- this.keyDeserializer = keyDeserializer;
+ @Deprecated
+ public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+ return new Builder<>(this, null, clazz, null, null);
+ }
+
+ /**
+ * Specifying this value deserializer overrides the property value.deserializer. If you have set a custom RecordTranslator before
+ * calling this it may result in class cast exceptions at runtime.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+ */
+ @Deprecated
+ public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
+ return new Builder<>(this, null, null, valueDeserializer, null);
+ }
+
+ /**
+ * Specify a class that can be instantiated to create a value.deserializer This is the same as setting value.deserializer, but
+ * overrides it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
+ */
+ @Deprecated
+ public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> clazz) {
+ return new Builder<>(this, null, null, null, clazz);
+ }
+
+ /**
+ * Set a {@link KafkaConsumer} property.
+ */
+ public Builder<K, V> setProp(String key, Object value) {
+ kafkaProps.put(key, value);
return this;
}
/**
- * Specifying this value deserializer overrides the property value.deserializer
+ * Set multiple {@link KafkaConsumer} properties.
*/
- public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
- this.valueDeserializer = valueDeserializer;
+ public Builder<K, V> setProp(Map<String, Object> props) {
+ kafkaProps.putAll(props);
return this;
}
/**
- * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
- * @param pollTimeoutMs time in ms
+ * Set multiple {@link KafkaConsumer} properties.
*/
- public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
- this.pollTimeoutMs = pollTimeoutMs;
+ public Builder<K, V> setProp(Properties props) {
+ for(Entry<Object, Object> entry : props.entrySet()) {
+ if (entry.getKey() instanceof String) {
+ kafkaProps.put((String) entry.getKey(), entry.getValue());
+ } else {
+ throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
+ }
+ }
return this;
}
/**
- * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
- * @param offsetCommitPeriodMs time in ms
+ * Set the group.id for the consumers
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#GROUP_ID_CONFIG} instead
*/
- public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
- this.offsetCommitPeriodMs = offsetCommitPeriodMs;
+ @Deprecated
+ public Builder<K, V> setGroupId(String id) {
+ return setProp("group.id", id);
+ }
+
+ /**
+ * reset the bootstrap servers for the Consumer
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} instead
+ */
+ @Deprecated
+ public Builder<K, V> setBootstrapServers(String servers) {
+ return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+ }
+
+ /**
+ * The minimum amount of data the broker should return for a fetch request.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} instead
+ */
+ @Deprecated
+ public Builder<K, V> setFetchMinBytes(int bytes) {
+ return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes);
+ }
+
+ /**
+ * The maximum amount of data per-partition the broker will return.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_PARTITION_FETCH_BYTES_CONFIG} instead
+ */
+ @Deprecated
+ public Builder<K, V> setMaxPartitionFectchBytes(int bytes) {
+ return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes);
+ }
+
+ /**
+ * The maximum number of records a poll will return.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG} instead
+ */
+ @Deprecated
+ public Builder<K, V> setMaxPollRecords(int records) {
+ return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
+ }
+
+ //Security Related Configs
+ /**
+ * Configure the SSL Keystore for mutual authentication
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location" and "ssl.keystore.password" instead
+ */
+ @Deprecated
+ public Builder<K, V> setSSLKeystore(String location, String password) {
+ return setProp("ssl.keystore.location", location)
+ .setProp("ssl.keystore.password", password);
+ }
+
+ /**
+ * Configure the SSL Keystore for mutual authentication
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location", "ssl.keystore.password" and "ssl.key.password" instead
+ */
+ @Deprecated
+ public Builder<K, V> setSSLKeystore(String location, String password, String keyPassword) {
+ return setProp("ssl.key.password", keyPassword)
+ .setSSLKeystore(location, password);
+ }
+
+ /**
+ * Configure the SSL Truststore to authenticate with the brokers
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol", "ssl.truststore.location" and "ssl.truststore.password" instead
+ */
+ @Deprecated
+ public Builder<K, V> setSSLTruststore(String location, String password) {
+ return setSecurityProtocol("SSL")
+ .setProp("ssl.truststore.location", location)
+ .setProp("ssl.truststore.password", password);
+ }
+
+ /**
+ * Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+ *
+ * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol" instead
+ */
+ @Deprecated
+ public Builder<K, V> setSecurityProtocol(String protocol) {
+ return setProp("security.protocol", protocol);
+ }
+
+ //Spout Settings
+ /**
+ * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+ *
+ * @param pollTimeoutMs time in ms
+ */
+ public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
+ this.pollTimeoutMs = pollTimeoutMs;
return this;
}
/**
- * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
- * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
- * all the previously polled records.
- * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
- * polled records in favor of processing more records.
- * @param maxRetries max number of retrials
+ * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+ *
+ * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
+ * {@link ProcessingGuarantee#NO_GUARANTEE}.
+ *
+ * @param offsetCommitPeriodMs time in ms
*/
- public Builder<K,V> setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
+ public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+ this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}
/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
- * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+ * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+ * This limit is per partition and may in some cases be exceeded,
+ * but each partition cannot exceed this limit by more than maxPollRecords - 1.
+ *
+ * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+ *
* @param maxUncommittedOffsets max number of records that can be be pending commit
*/
- public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+ public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
this.maxUncommittedOffsets = maxUncommittedOffsets;
return this;
}
/**
- * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
- * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+ * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+ * documentation in {@link FirstPollOffsetStrategy}
+ *
* @param firstPollOffsetStrategy Offset used by Kafka spout first poll
- * */
+ */
public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
this.firstPollOffsetStrategy = firstPollOffsetStrategy;
return this;
}
- public KafkaSpoutConfig<K,V> build() {
+ /**
+ * Sets the retry service for the spout to use.
+ *
+ * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+ *
+ * @param retryService the new retry service
+ * @return the builder (this).
+ */
+ public Builder<K, V> setRetry(KafkaSpoutRetryService retryService) {
+ if (retryService == null) {
+ throw new NullPointerException("retryService cannot be null");
+ }
+ this.retryService = retryService;
+ return this;
+ }
+
+ /**
+ * Sets the tuple listener for the spout to use.
+ *
+ * @param tupleListener the tuple listener
+ * @return the builder (this).
+ */
+ public Builder<K, V> setTupleListener(KafkaTupleListener tupleListener) {
+ if (tupleListener == null) {
+ throw new NullPointerException("KafkaTupleListener cannot be null");
+ }
+ this.tupleListener = tupleListener;
+ return this;
+ }
+
+ public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) {
+ this.translator = translator;
+ return this;
+ }
+
+ /**
+ * Configure a translator with tuples to be emitted on the default stream.
+ *
+ * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+ * @param fields the names of the fields extracted
+ * @return this to be able to chain configuration
+ */
+ public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+ return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
+ }
+
+ /**
+ * Configure a translator with tuples to be emitted to a given stream.
+ *
+ * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+ * @param fields the names of the fields extracted
+ * @param stream the stream to emit the tuples on
+ * @return this to be able to chain configuration
+ */
+ public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+ return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
+ }
+
+ /**
+ * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
+ * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+ * PatternSubscription rely on kafka to handle this instead.
+ *
+ * @param partitionRefreshPeriodMs time in milliseconds
+ * @return the builder (this)
+ */
+ public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
+ this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+ return this;
+ }
+
+ /**
+ * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
+ * this parameter is set to false, which means that null tuples are not emitted.
+ *
+ * @param emitNullTuples sets if null tuples should or not be emitted downstream
+ */
+ public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
+ this.emitNullTuples = emitNullTuples;
+ return this;
+ }
+
+ /**
+ * Specifies which processing guarantee the spout should offer. Refer to the documentation for {@link ProcessingGuarantee}.
+ *
+ * @param processingGuarantee The processing guarantee the spout should offer.
+ */
+ public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee processingGuarantee) {
+ this.processingGuarantee = processingGuarantee;
+ return this;
+ }
+
+ /**
+ * Specifies whether the spout should require Storm to track emitted tuples when using a {@link ProcessingGuarantee} other than
+ * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always track emitted tuples when offering at-least-once guarantees
+ * regardless of this setting. This setting is false by default.
+ *
+ * <p>Enabling tracking can be useful even in cases where reliability is not a concern, because it allows
+ * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would
+ * otherwise be disabled.
+ *
+ * @param tupleTrackingEnforced true if Storm should track emitted tuples, false otherwise
+ */
+ public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) {
+ this.tupleTrackingEnforced = tupleTrackingEnforced;
+ return this;
+ }
+
+ /**
+ * The time period that metrics data in bucketed into.
+ * @param metricsTimeBucketSizeInSecs time in seconds
+ */
+ public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) {
+ this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
+ return this;
+ }
+
+ public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
}
}
+ /**
+ * Factory method that creates a Builder with String key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topics to subscribe to
+ * @return The new builder
+ */
+ public static Builder<String, String> builder(String bootstrapServers, String... topics) {
+ return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+ }
+
+ /**
+ * Factory method that creates a Builder with String key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topics to subscribe to
+ * @return The new builder
+ */
+ public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+ return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+ }
+
+ /**
+ * Factory method that creates a Builder with String key/value deserializers.
+ *
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topic pattern to subscribe to
+ * @return The new builder
+ */
+ public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
+ return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+ }
+
+ private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
+ builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return builder;
+ }
+
+ private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
+ if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+ LOG.warn("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee."
+ + "This will be treated as an error in the next major release.");
+
+ final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString());
+ if (enableAutoCommit) {
+ builder.processingGuarantee = ProcessingGuarantee.NO_GUARANTEE;
+ } else {
+ builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE;
+ }
+ }
+ String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+ if (autoOffsetResetPolicy == null) {
+ /*
+ * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
+ * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
+ * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
+ * requests an offset that was deleted.
+ */
+ LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
+ LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+ + " Some messages may be skipped.");
+ }
+ } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+ if (autoOffsetResetPolicy != null
+ && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
+ LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+ + " Some messages may be processed more than once.");
+ }
+ }
+ LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ }
+
+ /**
+ * Gets the properties that will be passed to the KafkaConsumer.
+ *
+ * @return The Kafka properties map
+ */
public Map<String, Object> getKafkaProps() {
return kafkaProps;
}
+ /**
+ * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+ */
+ @Deprecated
public Deserializer<K> getKeyDeserializer() {
- return keyDeserializer;
+ if (keyDesClazz != null) {
+ try {
+ return keyDesClazz.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException("Could not instantiate key deserializer " + keyDesClazz);
+ }
+ }
+ return keyDes;
}
+ /**
+ * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
+ */
+ @Deprecated
public Deserializer<V> getValueDeserializer() {
- return valueDeserializer;
+ if (valueDesClazz != null) {
+ try {
+ return valueDesClazz.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException("Could not instantiate value deserializer " + valueDesClazz);
+ }
+ }
+ return valueDes;
+ }
+
+ public Subscription getSubscription() {
+ return subscription;
+ }
+
+ public RecordTranslator<K, V> getTranslator() {
+ return translator;
}
public long getPollTimeoutMs() {
@@ -254,75 +832,71 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return offsetCommitPeriodMs;
}
+ /**
+ * @deprecated Use {@link #getProcessingGuarantee()} instead.
+ */
+ @Deprecated
public boolean isConsumerAutoCommitMode() {
- return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null // default is true
- || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT));
+ return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
+ || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ }
+
+ public ProcessingGuarantee getProcessingGuarantee() {
+ return processingGuarantee;
}
- public String getConsumerGroupId() {
- return (String) kafkaProps.get(Consumer.GROUP_ID);
+ public boolean isTupleTrackingEnforced() {
+ return tupleTrackingEnforced;
}
- /**
- * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream},
- * or null if this stream is associated with a wildcard pattern topic
- */
- public List<String> getSubscribedTopics() {
- return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ?
- new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) :
- null;
+ public String getConsumerGroupId() {
+ return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
}
- /**
- * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
- * if this stream is associated with a specific named topic
- */
- public Pattern getTopicWildcardPattern() {
- return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ?
- ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() :
- null;
+ public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+ return firstPollOffsetStrategy;
}
- public int getMaxTupleRetries() {
- return maxRetries;
+ public int getMaxUncommittedOffsets() {
+ return maxUncommittedOffsets;
}
- public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
- return firstPollOffsetStrategy;
+ public KafkaSpoutRetryService getRetryService() {
+ return retryService;
}
- public KafkaSpoutStreams getKafkaSpoutStreams() {
- return kafkaSpoutStreams;
+ public KafkaTupleListener getTupleListener() {
+ return tupleListener;
}
- public int getMaxUncommittedOffsets() {
- return maxUncommittedOffsets;
+ public long getPartitionRefreshPeriodMs() {
+ return partitionRefreshPeriodMs;
}
- public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
- return tuplesBuilder;
+ public boolean isEmitNullTuples() {
+ return emitNullTuples;
}
- public KafkaSpoutRetryService getRetryService() {
- return retryService;
+ public int getMetricsTimeBucketSizeInSecs() {
+ return metricsTimeBucketSizeInSecs;
}
@Override
public String toString() {
- return "KafkaSpoutConfig{" +
- "kafkaProps=" + kafkaProps +
- ", keyDeserializer=" + keyDeserializer +
- ", valueDeserializer=" + valueDeserializer +
- ", pollTimeoutMs=" + pollTimeoutMs +
- ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
- ", maxRetries=" + maxRetries +
- ", maxUncommittedOffsets=" + maxUncommittedOffsets +
- ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
- ", kafkaSpoutStreams=" + kafkaSpoutStreams +
- ", tuplesBuilder=" + tuplesBuilder +
- ", retryService=" + retryService +
- ", topics=" + getSubscribedTopics() +
- ", topicWildcardPattern=" + getTopicWildcardPattern() +
- '}';
+ return "KafkaSpoutConfig{"
+ + "kafkaProps=" + kafkaProps
+ + ", key=" + getKeyDeserializer()
+ + ", value=" + getValueDeserializer()
+ + ", pollTimeoutMs=" + pollTimeoutMs
+ + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+ + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+ + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+ + ", subscription=" + subscription
+ + ", translator=" + translator
+ + ", retryService=" + retryService
+ + ", tupleListener=" + tupleListener
+ + ", processingGuarantee=" + processingGuarantee
+ + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
+ + '}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 71f8327..1626fee 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -18,21 +18,42 @@
package org.apache.storm.kafka.spout;
+import java.io.Serializable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
-public class KafkaSpoutMessageId {
- private transient TopicPartition topicPart;
- private transient long offset;
- private transient int numFails = 0;
+public class KafkaSpoutMessageId implements Serializable {
+ private final TopicPartition topicPart;
+ private final long offset;
+ private int numFails = 0;
+ /**
+ * true if the record was emitted using a form of collector.emit(...). false
+ * when skipping null tuples as configured by the user in KafkaSpoutConfig
+ */
+ private boolean emitted;
- public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
- this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
+ public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
+ this(consumerRecord, true);
+ }
+
+ public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean emitted) {
+ this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), emitted);
}
public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
+ this(topicPart, offset, true);
+ }
+
+ /**
+ * Creates a new KafkaSpoutMessageId.
+ * @param topicPart The topic partition this message belongs to
+ * @param offset The offset of this message
+ * @param emitted True iff this message is not being skipped as a null tuple
+ */
+ public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) {
this.topicPart = topicPart;
this.offset = offset;
+ this.emitted = emitted;
}
public int partition() {
@@ -59,22 +80,22 @@ public class KafkaSpoutMessageId {
return topicPart;
}
- public String getMetadata(Thread currThread) {
- return "{" +
- "topic-partition=" + topicPart +
- ", offset=" + offset +
- ", numFails=" + numFails +
- ", thread='" + currThread.getName() + "'" +
- '}';
+ public boolean isEmitted() {
+ return emitted;
+ }
+
+ public void setEmitted(boolean emitted) {
+ this.emitted = emitted;
}
@Override
public String toString() {
- return "{" +
- "topic-partition=" + topicPart +
- ", offset=" + offset +
- ", numFails=" + numFails +
- '}';
+ return "{"
+ + "topic-partition=" + topicPart
+ + ", offset=" + offset
+ + ", numFails=" + numFails
+ + ", emitted=" + emitted
+ + '}';
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index f59367d..68a6f3f 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -25,49 +25,63 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.utils.Time;
/**
* Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ...
* nextRetry = Min(nextRetry, currentTime + maxDelay)
*/
public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
- private TimeInterval initialDelay;
- private TimeInterval delayPeriod;
- private TimeInterval maxDelay;
- private int maxRetries;
+ private final TimeInterval initialDelay;
+ private final TimeInterval delayPeriod;
+ private final TimeInterval maxDelay;
+ private final int maxRetries;
- private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
- private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
+ //This class assumes that there is at most one retry schedule per message id in this set at a time.
+ private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+ private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
/**
* Comparator ordering by timestamp
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+ @Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
- return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+ int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+
+ if(result == 0) {
+ //TreeSet uses compareTo instead of equals() for the Set contract
+ //Ensure that we can save two retry schedules with the same timestamp
+ result = entry1.hashCode() - entry2.hashCode();
+ }
+ return result;
}
}
private class RetrySchedule {
- private KafkaSpoutMessageId msgId;
+ private final KafkaSpoutMessageId msgId;
private long nextRetryTimeNanos;
- public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+ public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
this.msgId = msgId;
- this.nextRetryTimeNanos = nextRetryTime;
+ this.nextRetryTimeNanos = nextRetryTimeNanos;
LOG.debug("Created {}", this);
}
- public void setNextRetryTime() {
+ public void setNextRetryTimeNanos() {
nextRetryTimeNanos = nextTime(msgId);
LOG.debug("Updated {}", this);
}
@@ -80,7 +94,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
public String toString() {
return "RetrySchedule{" +
"msgId=" + msgId +
- ", nextRetryTime=" + nextRetryTimeNanos +
+ ", nextRetryTimeNanos=" + nextRetryTimeNanos +
'}';
}
@@ -94,20 +108,20 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
public static class TimeInterval implements Serializable {
- private long lengthNanos;
- private long length;
- private TimeUnit timeUnit;
+ private final long lengthNanos;
+ private final TimeUnit timeUnit;
+ private final long length;
/**
* @param length length of the time interval in the units specified by {@link TimeUnit}
* @param timeUnit unit used to specify a time interval on which to specify a time unit
*/
public TimeInterval(long length, TimeUnit timeUnit) {
- this.length = length;
- this.timeUnit = timeUnit;
this.lengthNanos = timeUnit.toNanos(length);
+ this.timeUnit = timeUnit;
+ this.length = length;
}
-
+
public static TimeInterval seconds(long length) {
return new TimeInterval(length, TimeUnit.SECONDS);
}
@@ -115,19 +129,15 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
public static TimeInterval milliSeconds(long length) {
return new TimeInterval(length, TimeUnit.MILLISECONDS);
}
-
+
public static TimeInterval microSeconds(long length) {
return new TimeInterval(length, TimeUnit.MICROSECONDS);
}
-
+
public long lengthNanos() {
return lengthNanos;
}
-
- public long length() {
- return length;
- }
-
+
public TimeUnit timeUnit() {
return timeUnit;
}
@@ -144,7 +154,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ * nextRetry = Min(nextRetry, currentTime + maxDelay).
+ *
+ * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
+ * polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -157,35 +170,42 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
this.delayPeriod = delayPeriod;
this.maxRetries = maxRetries;
this.maxDelay = maxDelay;
- LOG.debug("Instantiated {}", this);
+ LOG.debug("Instantiated {}", this.toStringImpl());
}
@Override
- public Set<TopicPartition> retriableTopicPartitions() {
- final Set<TopicPartition> tps = new HashSet<>();
- final long currentTimeNanos = System.nanoTime();
+ public Map<TopicPartition, Long> earliestRetriableOffsets() {
+ final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
+ final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
final KafkaSpoutMessageId msgId = retrySchedule.msgId;
- tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+ final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
+ final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
+ if(currentLowestOffset != null) {
+ tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
+ } else {
+ tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
+ }
} else {
break; // Stop searching as soon as passed current time
}
}
- LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
- return tps;
+ LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
+ return tpToEarliestRetriableOffset;
}
@Override
public boolean isReady(KafkaSpoutMessageId msgId) {
boolean retry = false;
- if (toRetryMsgs.contains(msgId)) {
- final long currentTimeNanos = System.nanoTime();
+ if (isScheduled(msgId)) {
+ final long currentTimeNanos = Time.nanoTime();
for (RetrySchedule retrySchedule : retrySchedules) {
if (retrySchedule.retry(currentTimeNanos)) {
if (retrySchedule.msgId.equals(msgId)) {
retry = true;
LOG.debug("Found entry to retry {}", retrySchedule);
+ break; //Stop searching if the message is known to be ready for retry
}
} else {
LOG.debug("Entry to retry not found {}", retrySchedule);
@@ -204,14 +224,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
@Override
public boolean remove(KafkaSpoutMessageId msgId) {
boolean removed = false;
- if (toRetryMsgs.contains(msgId)) {
+ if (isScheduled(msgId)) {
+ toRetryMsgs.remove(msgId);
for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
final RetrySchedule retrySchedule = iterator.next();
if (retrySchedule.msgId().equals(msgId)) {
iterator.remove();
- toRetryMsgs.remove(msgId);
removed = true;
- break;
+ break; //There is at most one schedule per message id
}
}
}
@@ -239,38 +259,66 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
@Override
- public void schedule(KafkaSpoutMessageId msgId) {
+ public boolean schedule(KafkaSpoutMessageId msgId) {
if (msgId.numFails() > maxRetries) {
LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+ return false;
} else {
- if (toRetryMsgs.contains(msgId)) {
- for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
- final RetrySchedule retrySchedule = iterator.next();
- if (retrySchedule.msgId().equals(msgId)) {
- iterator.remove();
- toRetryMsgs.remove(msgId);
- }
- }
- }
+ //Remove existing schedule for the message id
+ remove(msgId);
final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
retrySchedules.add(retrySchedule);
toRetryMsgs.add(msgId);
LOG.debug("Scheduled. {}", retrySchedule);
LOG.trace("Current state {}", retrySchedules);
+ return true;
+ }
+ }
+
+ @Override
+ public int readyMessageCount() {
+ int count = 0;
+ final long currentTimeNanos = Time.nanoTime();
+ for (RetrySchedule retrySchedule : retrySchedules) {
+ if (retrySchedule.retry(currentTimeNanos)) {
+ ++count;
+ } else {
+ break; //Stop counting when past current time
+ }
}
+ return count;
+ }
+
+ @Override
+ public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+ KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ if (isScheduled(msgId)) {
+ for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+ if (originalMsgId.equals(msgId)) {
+ return originalMsgId;
+ }
+ }
+ }
+ return msgId;
}
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
- final long currentTimeNanos = System.nanoTime();
+ Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once");
+ final long currentTimeNanos = Time.nanoTime();
final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ...
? currentTimeNanos + initialDelay.lengthNanos
- : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
+ : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
}
@Override
public String toString() {
+ return toStringImpl();
+ }
+
+ private String toStringImpl() {
+ //This is here to avoid an overridable call in the constructor
return "KafkaSpoutRetryExponentialBackoff{" +
"delay=" + initialDelay +
", ratio=" + delayPeriod +
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5aab167..12d26da 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -18,25 +18,30 @@
package org.apache.storm.kafka.spout;
-import org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
/**
* Represents the logic that manages the retrial of failed tuples.
*/
public interface KafkaSpoutRetryService extends Serializable {
/**
- * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+ * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or
+ * updates retry time if it has already been scheduled. It may also indicate
+ * that the message should not be retried, in which case the message will not be scheduled.
* @param msgId message to schedule for retrial
+ * @return true if the message will be retried, false otherwise
*/
- void schedule(KafkaSpoutMessageId msgId);
+ boolean schedule(KafkaSpoutMessageId msgId);
/**
* Removes a message from the list of messages scheduled for retrial
* @param msgId message to remove from retrial
+ * @return true if the message was scheduled for retrial, false otherwise
*/
boolean remove(KafkaSpoutMessageId msgId);
@@ -50,14 +55,16 @@ public interface KafkaSpoutRetryService extends Serializable {
boolean retainAll(Collection<TopicPartition> topicPartitions);
/**
- * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
- * for which a tuple has failed and has retry time less than current time
+ * @return The earliest retriable offset for each TopicPartition that has
+ * offsets ready to be retried, i.e. for which a tuple has failed
+ * and has retry time less than current time
*/
- Set<TopicPartition> retriableTopicPartitions();
+ Map<TopicPartition, Long> earliestRetriableOffsets();
/**
- * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
* i.e is scheduled and has retry time that is less than current time.
+ * @param msgId message to check for readiness
* @return true if message is ready to be retried, false otherwise
*/
boolean isReady(KafkaSpoutMessageId msgId);
@@ -65,8 +72,23 @@ public interface KafkaSpoutRetryService extends Serializable {
/**
* Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
* The message may or may not be ready to be retried yet.
+ * @param msgId message to check for scheduling status
* @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
* Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);
+
+ /**
+ * @return The number of messages that are ready for retry
+ */
+ int readyMessageCount();
+
+ /**
+ * Gets the {@link KafkaSpoutMessageId} for the given record.
+ *
+ * @param record The record to fetch the id for
+ * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for
+ * retry.
+ */
+ KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
deleted file mode 100644
index 5375f6c..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/**
- * Represents the stream and output fields used by a topic
- */
-public class KafkaSpoutStream implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStream.class);
-
- private final Fields outputFields;
- private final String streamId;
- private final String topic;
- private Pattern topicWildcardPattern;
-
- /** Represents the specified outputFields and topic with the default stream */
- public KafkaSpoutStream(Fields outputFields, String topic) {
- this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
- }
-
- /** Represents the specified outputFields and topic with the specified stream */
- public KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
- if (outputFields == null || streamId == null || topic == null) {
- throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
- "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
- }
- this.outputFields = outputFields;
- this.streamId = streamId;
- this.topic = topic;
- this.topicWildcardPattern = null;
- }
-
- /** Represents the specified outputFields and topic wild card with the default stream */
- KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) {
- this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern);
- }
-
- /** Represents the specified outputFields and topic wild card with the specified stream */
- public KafkaSpoutStream(Fields outputFields, String streamId, Pattern topicWildcardPattern) {
-
- if (outputFields == null || streamId == null || topicWildcardPattern == null) {
- throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
- "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", outputFields, streamId, topicWildcardPattern));
- }
- this.outputFields = outputFields;
- this.streamId = streamId;
- this.topic = null;
- this.topicWildcardPattern = topicWildcardPattern;
- }
-
- public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
- collector.emit(streamId, tuple, messageId);
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = {}]", streamId, outputFields, topic);
- declarer.declareStream(streamId, outputFields);
- }
-
-
- public Fields getOutputFields() {
- return outputFields;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- /**
- * @return the topic associated with this {@link KafkaSpoutStream}, or null
- * if this stream is associated with a wildcard pattern topic
- */
- public String getTopic() {
- return topic;
- }
-
- /**
- * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
- * if this stream is associated with a specific named topic
- */
- public Pattern getTopicWildcardPattern() {
- return topicWildcardPattern;
- }
-
- @Override
- public String toString() {
- return "KafkaSpoutStream{" +
- "outputFields=" + outputFields +
- ", streamId='" + streamId + '\'' +
- ", topic='" + topic + '\'' +
- ", topicWildcardPattern=" + topicWildcardPattern +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
deleted file mode 100644
index 6910d3c..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Represents the {@link KafkaSpoutStream} associated with each topic or topic pattern (wildcard), and provides
- * a public API to declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
- */
-public interface KafkaSpoutStreams extends Serializable {
- void declareOutputFields(OutputFieldsDeclarer declarer);
-
- void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId);
-}