You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/19 23:13:14 UTC
kafka git commit: MINOR: various random minor fixes and improve
KafkaConsumer JavaDocs
Repository: kafka
Updated Branches:
refs/heads/trunk 80e0af50d -> bd0146d98
MINOR: various random minor fixes and improve KafkaConsumer JavaDocs
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3884 from mjsax/minor-fixed-discoverd-via-exception-handling-investigation
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd0146d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd0146d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd0146d9
Branch: refs/heads/trunk
Commit: bd0146d984dd5df82fb19aa936e8f4ff9ca40030
Parents: 80e0af5
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Sep 20 07:13:03 2017 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Sep 20 07:13:03 2017 +0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 104 ++++++++++------
.../producer/BufferExhaustedException.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 15 ++-
.../apache/kafka/clients/producer/Producer.java | 33 +++---
.../kafka/common/metrics/KafkaMetric.java | 1 -
.../main/scala/kafka/tools/StreamsResetter.java | 2 -
.../org/apache/kafka/streams/KafkaStreams.java | 118 ++++++++++---------
.../org/apache/kafka/streams/StreamsConfig.java | 8 +-
.../internals/AbstractProcessorContext.java | 15 +--
.../processor/internals/AbstractTask.java | 1 +
.../processor/internals/AssignedTasks.java | 8 +-
.../internals/InternalTopologyBuilder.java | 2 +-
.../processor/internals/RecordQueue.java | 28 ++---
.../internals/SourceNodeRecordDeserializer.java | 4 +-
.../processor/internals/StandbyTask.java | 37 +++---
.../internals/StoreChangelogReader.java | 9 +-
.../streams/processor/internals/StreamTask.java | 21 ++--
.../processor/internals/StreamThread.java | 26 ++--
.../processor/internals/TaskManager.java | 20 +---
19 files changed, 251 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 76e4073..c2f2f5f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -112,7 +112,7 @@ import java.util.regex.Pattern;
* Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and
* processing records. These processes can either be running on the same machine or they can be
* distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances
- * sharing the same <code>group.id</code> will be part of the same consumer group.
+ * sharing the same {@code group.id} will be part of the same consumer group.
* <p>
* Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the
* {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the
@@ -152,12 +152,12 @@ import java.util.regex.Pattern;
* invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
* will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
* the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for
- * a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will
+ * a duration of {@code session.timeout.ms}, then the consumer will be considered dead and its partitions will
* be reassigned.
* <p>
* It is also possible that the consumer could encounter a "livelock" situation where it is continuing
* to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions
- * indefinitely in this case, we provide a liveness detection mechanism using the <code>max.poll.interval.ms</code>
+ * indefinitely in this case, we provide a liveness detection mechanism using the {@code max.poll.interval.ms}
* setting. Basically if you don't call poll at least as frequently as the configured max interval,
* then the client will proactively leave the group so that another consumer can take over its partitions. When this happens,
* you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}).
@@ -211,15 +211,15 @@ import java.util.regex.Pattern;
* </pre>
*
* The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
- * configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the
+ * configuration {@code >bootstrap.servers}. This list is just used to discover the rest of the brokers in the
* cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
* case there are servers down when the client is connecting).
* <p>
- * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
- * the config <code>auto.commit.interval.ms</code>.
+ * Setting {@code enable.auto.commit} means that offsets are committed automatically with a frequency controlled by
+ * the config {@code auto.commit.interval.ms}.
* <p>
* In this example the consumer is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
- * called <i>test</i> as configured with <code>group.id</code>.
+ * called <i>test</i> as configured with {@code group.id}.
* <p>
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
* are saying that our record's key and value will just be simple strings.
@@ -423,8 +423,7 @@ import java.util.regex.Pattern;
* <p>
* Transactions were introduced in Kafka 0.11.0 wherein applications can write to multiple topics and partitions atomically.
* In order for this to work, consumers reading from these partitions should be configured to only read committed data.
- * This can be achieved by by setting the <code>isolation.level=read_committed</code> in the consumer's configuration.
- * </p>
+ * This can be achieved by by setting the {@code isolation.level=read_committed} in the consumer's configuration.
*
* <p>
* In <code>read_committed</code> mode, the consumer will read only those transactional messages which have been
@@ -433,17 +432,19 @@ import java.util.regex.Pattern;
* consumer would be the offset of the first message in the partition belonging to an open transaction. This offset
* is known as the 'Last Stable Offset'(LSO).</p>
*
- * <p>A </p><code>read_committed</code> consumer will only read up till the LSO and filter out any transactional
+ * <p>
+ * A {@code read_committed} consumer will only read up to the LSO and filter out any transactional
* messages which have been aborted. The LSO also affects the behavior of {@link #seekToEnd(Collection)} and
- * {@link #endOffsets(Collection)} for <code>read_committed</code> consumers, details of which are in each method's documentation.
- * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for <code>read_committed</code> consumers.</p>
+ * {@link #endOffsets(Collection)} for {@code read_committed} consumers, details of which are in each method's documentation.
+ * Finally, the fetch lag metrics are also adjusted to be relative to the LSO for {@code read_committed} consumers.
*
- * <p>Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction.
+ * <p>
+ * Partitions with transactional messages will include commit or abort markers which indicate the result of a transaction.
* There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from
* topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction
* markers, and they are filtered out for consumers in both isolation levels. Additionally, applications using
- * <code>read_committed</code> consumers may also see gaps due to aborted transactions, since those messages would not
- * be returned by the consumer and yet would have valid offsets.</p>
+ * {@code read_committed} consumers may also see gaps due to aborted transactions, since those messages would not
+ * be returned by the consumer and yet would have valid offsets.
*
* <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
*
@@ -869,7 +870,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param topics The list of topics to subscribe to
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics
- * @throws IllegalArgumentException If topics is null or contains null or empty elements
+ * @throws IllegalArgumentException If topics is null or contains null or empty elements, or if listener is null
+ * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
+ * previously (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
@@ -911,6 +914,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param topics The list of topics to subscribe to
* @throws IllegalArgumentException If topics is null or contains null or empty elements
+ * @throws IllegalStateException If {@code subscribe()} is called previously with pattern, or assign is called
+ * previously (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void subscribe(Collection<String> topics) {
@@ -918,9 +923,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics
- * existing at the time of check.
- *
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topic existing at the time of check.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that
* belong to a particular group and will trigger a rebalance operation if one of the
@@ -935,7 +939,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param pattern Pattern to subscribe to
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics
- * @throws IllegalArgumentException If pattern is null
+ * @throws IllegalArgumentException If pattern or listener is null
+ * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
+ * previously (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
@@ -956,7 +962,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics existing at the time of check.
- *
* <p>
* This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which
* uses a noop listener. If you need the ability to seek to particular offsets, you should prefer
@@ -966,6 +971,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param pattern Pattern to subscribe to
* @throws IllegalArgumentException If pattern is null
+ * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
+ * previously (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void subscribe(Pattern pattern) {
@@ -973,8 +980,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This
- * also clears any partitions directly assigned through {@link #assign(Collection)}.
+ * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
+ * This also clears any partitions directly assigned through {@link #assign(Collection)}.
*/
public void unsubscribe() {
acquireAndEnsureOpen();
@@ -991,17 +998,21 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
- *
+ * <p>
* If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
- *
* <p>
* Manual topic assignment through this method does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
* and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
+ * <p>
+ * If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
+ * assignment replaces the old one.
*
* @param partitions The list of partitions to assign this consumer
* @throws IllegalArgumentException If partitions is null or contains null or empty topics
+ * @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
+ * (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void assign(Collection<TopicPartition> partitions) {
@@ -1289,6 +1300,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
+ *
+ * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
+ * or if provided offset is negative
*/
@Override
public void seek(TopicPartition partition, long offset) {
@@ -1307,11 +1321,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the
* first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
- * If no partition is provided, seek to the first offset for all of the currently assigned partitions.
+ * If no partitions are provided, seek to the first offset for all of the currently assigned partitions.
+ *
+ * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
*/
public void seekToBeginning(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
+ if (partitions == null) {
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+ }
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to beginning of partition {}", tp);
@@ -1325,14 +1344,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
* final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
- * If no partition is provided, seek to the final offset for all of the currently assigned partitions.
- *
- * If <code>isolation.level=read_committed</code>, the end offset will be the Last Stable Offset, ie. the offset
+ * If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
+ * <p>
+ * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
* of the first message with an open transaction.
+ *
+ * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer
*/
public void seekToEnd(Collection<TopicPartition> partitions) {
acquireAndEnsureOpen();
try {
+ if (partitions == null) {
+ throw new IllegalArgumentException("Partitions collection cannot be null");
+ }
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp);
@@ -1348,6 +1372,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partition The partition to get the position for
* @return The offset
+ * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
* the partition
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
@@ -1471,6 +1496,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
+ * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
*/
@Override
public void pause(Collection<TopicPartition> partitions) {
@@ -1490,6 +1516,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* {@link #poll(long)} will return records from these partitions if there are any to be fetched.
* If the partitions were not previously paused, this method is a no-op.
* @param partitions The partitions which should be resumed
+ * @throws IllegalStateException if one of the provided partitions is not assigned to this consumer
*/
@Override
public void resume(Collection<TopicPartition> partitions) {
@@ -1534,6 +1561,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws IllegalArgumentException if the target timestamp is negative.
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * expiration of the configured request timeout
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
* the offsets by timestamp.
*/
@@ -1564,6 +1593,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param partitions the partitions to get the earliest offsets.
* @return The earliest available offsets for the given partitions
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * expiration of the configured request timeout
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
@@ -1581,16 +1612,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* <p>
* Notice that this method may block indefinitely if the partition does not exist.
* This method does not change the current consumer position of the partitions.
- * </p>
- *
- * <p>When <code>isolation.level=read_committed</code> the last offset will be the Last Stable Offset (LSO).
+ * <p>
+ * When {@code isolation.level=read_committed} the last offset will be the Last Stable Offset (LSO).
* This is the offset of the first message with an open transaction. The LSO moves forward as transactions
- * are completed.</p>
+ * are completed.
*
* @see #seekToEnd(Collection)
*
* @param partitions the partitions to get the end offsets.
* @return The end offsets for the given partitions.
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * expiration of the configured request timeout
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
@@ -1619,7 +1651,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
- * <code>timeout</code> for the consumer to complete pending commits and leave the group.
+ * {@code timeout} for the consumer to complete pending commits and leave the group.
* If auto-commit is enabled, this will commit the current offsets if possible within the
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
@@ -1627,9 +1659,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
- * @param timeUnit The time unit for the <code>timeout</code>
+ * @param timeUnit The time unit for the {@code timeout}
* @throws InterruptException If the thread is interrupted before or while this function is called
- * @throws IllegalArgumentException If the <code>timeout</code> is negative.
+ * @throws IllegalArgumentException If the {@code timeout} is negative.
*/
public void close(long timeout, TimeUnit timeUnit) {
if (timeout < 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
index 929b6b9..be840db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/BufferExhaustedException.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.KafkaException;
/**
* This exception is thrown if the producer is in non-blocking mode and the rate of data production exceeds the rate at
- * which data can be sent for long enough for the alloted buffer to be exhausted.
+ * which data can be sent for long enough for the allocated buffer to be exhausted.
*/
public class BufferExhaustedException extends KafkaException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 7d51640..7dcec5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -20,7 +20,9 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
@@ -564,12 +566,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
/**
- * Sends a list of consumed offsets to the consumer group coordinator, and also marks
+ * Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
- * consumed only if the transaction is committed successfully.
- *
+ * committed only if the transaction is committed successfully. The committed offset should
+ * be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
+ * <p>
* This method should be used when you need to batch consumed and produced messages
- * together, typically in a consume-transform-produce pattern.
+ * together, typically in a consume-transform-produce pattern. Thus, the specified
+ * {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
+ * {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
+ * and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
+ * {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 1e77633..4982033 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -63,43 +63,38 @@ public interface Producer<K, V> extends Closeable {
void abortTransaction() throws ProducerFencedException;
/**
- * Send the given record asynchronously and return a future which will eventually contain the response information.
- *
- * @param record The record to send
- * @return A future which will eventually contain the response information
+ * See {@link KafkaProducer#send(ProducerRecord)}
*/
- public Future<RecordMetadata> send(ProducerRecord<K, V> record);
+ Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
- * Send a record and invoke the given callback when the record has been acknowledged by the server
+ * See {@link KafkaProducer#send(ProducerRecord, Callback)}
*/
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
+ Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
/**
- * Flush any accumulated records from the producer. Blocks until all sends are complete.
+ * See {@link KafkaProducer#flush()}
*/
- public void flush();
+ void flush();
/**
- * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
- * over time so this list should not be cached.
+ * See {@link KafkaProducer#partitionsFor(String)}
*/
- public List<PartitionInfo> partitionsFor(String topic);
+ List<PartitionInfo> partitionsFor(String topic);
/**
- * Return a map of metrics maintained by the producer
+ * See {@link KafkaProducer#metrics()}
*/
- public Map<MetricName, ? extends Metric> metrics();
+ Map<MetricName, ? extends Metric> metrics();
/**
- * Close this producer
+ * See {@link KafkaProducer#close()}
*/
- public void close();
+ void close();
/**
- * Tries to close the producer cleanly within the specified timeout. If the close does not complete within the
- * timeout, fail any pending send requests and force close the producer.
+ * See {@link KafkaProducer#close(long, TimeUnit)}
*/
- public void close(long timeout, TimeUnit unit);
+ void close(long timeout, TimeUnit unit);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index 1cd5b24..ef53b89 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -29,7 +29,6 @@ public final class KafkaMetric implements Metric {
private MetricConfig config;
KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
- super();
this.metricName = metricName;
this.lock = lock;
this.measurable = measurable;
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 7ee5424..9cf0e5c 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -22,11 +22,9 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
-
import kafka.admin.AdminClient;
import kafka.admin.TopicCommand;
import kafka.utils.ZkUtils;
-
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b31a3e3..5aec3c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
@@ -40,6 +41,7 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
@@ -52,7 +54,6 @@ import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidat
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.StreamsMetadata;
-import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
@@ -328,6 +329,7 @@ public class KafkaStreams {
* An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes.
*
* @param listener a new state listener
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/
public void setStateListener(final KafkaStreams.StateListener listener) {
if (state == State.CREATED) {
@@ -342,6 +344,7 @@ public class KafkaStreams {
* terminates due to an uncaught exception.
*
* @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/
public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) {
if (state == State.CREATED) {
@@ -362,6 +365,7 @@ public class KafkaStreams {
* processing.
*
* @param globalStateRestoreListener The listener triggered when {@link StateStore} is being restored.
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}.
*/
public void setGlobalStateRestoreListener(final StateRestoreListener globalStateRestoreListener) {
if (state == State.CREATED) {
@@ -491,6 +495,7 @@ public class KafkaStreams {
*
* @param topology the topology specifying the computational logic
* @param props properties for {@link StreamsConfig}
+ * @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final Properties props) {
@@ -502,6 +507,7 @@ public class KafkaStreams {
*
* @param topology the topology specifying the computational logic
* @param config the Kafka Streams configuration
+ * @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final StreamsConfig config) {
@@ -515,6 +521,7 @@ public class KafkaStreams {
* @param config the Kafka Streams configuration
* @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients
* for the new {@code KafkaStreams} instance
+ * @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
final StreamsConfig config,
@@ -524,69 +531,33 @@ public class KafkaStreams {
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
- final KafkaClientSupplier clientSupplier) {
- // create the metrics
- final Time time = Time.SYSTEM;
-
- processId = UUID.randomUUID();
-
+ final KafkaClientSupplier clientSupplier) throws StreamsException {
this.config = config;
// The application ID is a required config and hence should always have value
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-
- internalTopologyBuilder.setApplicationId(applicationId);
+ processId = UUID.randomUUID();
String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
- if (clientId.length() <= 0)
+ if (clientId.length() <= 0) {
clientId = applicationId + "-" + processId;
+ }
- this.logPrefix = String.format("stream-client [%s] ", clientId);
-
+ this.logPrefix = String.format("stream-client [%s]", clientId);
final LogContext logContext = new LogContext(logPrefix);
-
this.log = logContext.logger(getClass());
+ final String cleanupThreadName = clientId + "-CleanupThread";
- final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- reporters.add(new JmxReporter(JMX_PREFIX));
-
- final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
- .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
- .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
- TimeUnit.MILLISECONDS);
-
- metrics = new Metrics(metricConfig, reporters, time);
-
- threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
- final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
- GlobalStreamThread.State globalThreadState = null;
-
- final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
- streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
-
- final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ internalTopologyBuilder.setApplicationId(applicationId);
+ // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
+ internalTopologyBuilder.build(null);
- if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
+ long cacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ if (cacheSize < 0) {
+ cacheSize = 0;
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
- final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
- (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + (globalTaskTopology == null ? 0 : 1)));
-
- stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
- if (globalTaskTopology != null) {
- final String globalThreadId = clientId + "-GlobalStreamThread";
- globalStreamThread = new GlobalStreamThread(globalTaskTopology,
- config,
- clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
- stateDirectory,
- metrics,
- time,
- globalThreadId);
- globalThreadState = globalStreamThread.state();
- }
-
final StateRestoreListener delegatingStateRestoreListener = new StateRestoreListener() {
@Override
public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
@@ -610,6 +581,44 @@ public class KafkaStreams {
}
};
+ threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+ try {
+ stateDirectory = new StateDirectory(
+ applicationId,
+ config.getString(StreamsConfig.STATE_DIR_CONFIG),
+ Time.SYSTEM);
+ } catch (final ProcessorStateException fatal) {
+ throw new StreamsException(fatal);
+ }
+ streamsMetadataState = new StreamsMetadataState(
+ internalTopologyBuilder,
+ parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+
+ final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+ .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+ final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(JMX_PREFIX));
+ metrics = new Metrics(metricConfig, reporters, Time.SYSTEM);
+
+ GlobalStreamThread.State globalThreadState = null;
+ final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ if (globalTaskTopology != null) {
+ final String globalThreadId = clientId + "-GlobalStreamThread";
+ globalStreamThread = new GlobalStreamThread(globalTaskTopology,
+ config,
+ clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
+ stateDirectory,
+ metrics,
+ Time.SYSTEM,
+ globalThreadId);
+ globalThreadState = globalStreamThread.state();
+ }
+
+ final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
+ final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = StreamThread.create(internalTopologyBuilder,
config,
@@ -617,14 +626,15 @@ public class KafkaStreams {
processId,
clientId,
metrics,
- time,
+ Time.SYSTEM,
streamsMetadataState,
- cacheSizeBytes,
+ cacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)),
stateDirectory,
delegatingStateRestoreListener);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
+
final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState);
if (globalTaskTopology != null) {
globalStreamThread.setStateListener(streamStateListener);
@@ -635,7 +645,6 @@ public class KafkaStreams {
final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
- final String cleanupThreadName = clientId + "-CleanupThread";
stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
@@ -690,7 +699,8 @@ public class KafkaStreams {
* {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
* @throws IllegalStateException if process was already started
- * @throws StreamsException if the Kafka brokers have version 0.10.0.x
+ * @throws StreamsException if the Kafka brokers have version 0.10.0.x or
+ * if {@link StreamsConfig#PROCESSING_GUARANTEE_CONFIG exactly-once} is enabled for pre 0.11.0.x brokers
*/
public synchronized void start() throws IllegalStateException, StreamsException {
log.debug("Starting Streams client");
@@ -858,7 +868,7 @@ public class KafkaStreams {
* <p>
* Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}.
*
- * @throws IllegalStateException if the instance is currently running
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running}
*/
public void cleanUp() {
if (isRunning()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 446f941..cde416d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -342,10 +342,10 @@ public class StreamsConfig extends AbstractConfig {
Importance.MEDIUM,
CLIENT_ID_DOC)
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
- Type.CLASS,
- LogAndFailExceptionHandler.class.getName(),
- Importance.MEDIUM,
- DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ Type.CLASS,
+ LogAndFailExceptionHandler.class.getName(),
+ Importance.MEDIUM,
+ DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
.define(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Type.CLASS,
Serdes.ByteArraySerde.class.getName(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 3c8e077..9e853fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -46,12 +46,11 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
final StateManager stateManager;
public AbstractProcessorContext(final TaskId taskId,
- final String applicationId,
- final StreamsConfig config,
- final StreamsMetrics metrics,
- final StateManager stateManager,
- final ThreadCache cache) {
-
+ final String applicationId,
+ final StreamsConfig config,
+ final StreamsMetrics metrics,
+ final StateManager stateManager,
+ final ThreadCache cache) {
this.taskId = taskId;
this.applicationId = applicationId;
this.config = config;
@@ -93,7 +92,9 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
}
@Override
- public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
+ public void register(final StateStore store,
+ final boolean loggingEnabled,
+ final StateRestoreCallback stateRestoreCallback) {
if (initialized) {
throw new IllegalStateException("Can only create state stores during initialization.");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 5ed9aae..6734da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,7 @@ public abstract class AbstractTask implements Task {
* @throws ProcessorStateException if there is an error while closing the state manager
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
*/
+ // visible for testing
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
ProcessorStateException exception = null;
log.trace("Closing state manager");
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2d886b7..3208f93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -137,7 +137,7 @@ class AssignedTasks {
restoredPartitions.addAll(restored);
for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, Task> entry = it.next();
- Task task = entry.getValue();
+ final Task task = entry.getValue();
if (restoredPartitions.containsAll(task.changelogPartitions())) {
transitionToRunning(task);
resume.addAll(task.partitions());
@@ -303,11 +303,12 @@ class AssignedTasks {
builder.append("\n");
}
- private List<Task> allInitializedTasks() {
+ private List<Task> allTasks() {
final List<Task> tasks = new ArrayList<>();
tasks.addAll(running.values());
tasks.addAll(suspended.values());
tasks.addAll(restoring.values());
+ tasks.addAll(created.values());
return tasks;
}
@@ -428,8 +429,7 @@ class AssignedTasks {
}
void close(final boolean clean) {
- close(allInitializedTasks(), clean);
- close(created.values(), clean);
+ close(allTasks(), clean);
clear();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index d47af88..81d2f6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1744,7 +1744,7 @@ public class InternalTopologyBuilder {
private String subtopologiesAsString() {
final StringBuilder sb = new StringBuilder();
- sb.append("Sub-topologies: \n");
+ sb.append("Sub-topologies:\n");
if (subtopologies.isEmpty()) {
sb.append(" none\n");
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index d26511c..889b6d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -42,7 +42,6 @@ public class RecordQueue {
private final ArrayDeque<StampedRecord> fifoQueue;
private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
private final SourceNodeRecordDeserializer recordDeserializer;
- private final DeserializationExceptionHandler deserializationExceptionHandler;
private final ProcessorContext processorContext;
private long partitionTime = TimestampTracker.NOT_KNOWN;
@@ -58,11 +57,9 @@ public class RecordQueue {
this.fifoQueue = new ArrayDeque<>();
this.timeTracker = new MinTimestampTracker<>();
this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
- this.deserializationExceptionHandler = deserializationExceptionHandler;
this.processorContext = processorContext;
}
-
/**
* Returns the corresponding source node in the topology
*
@@ -87,15 +84,15 @@ public class RecordQueue {
* @param rawRecords the raw records
* @return the size of this queue
*/
- public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
- for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
+ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
+ for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
- ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
+ final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
if (record == null) {
continue;
}
- long timestamp = timestampExtractor.extract(record, timeTracker.get());
+ final long timestamp = timestampExtractor.extract(record, timeTracker.get());
log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
// drop message if TS is invalid, i.e., negative
@@ -103,7 +100,7 @@ public class RecordQueue {
continue;
}
- StampedRecord stampedRecord = new StampedRecord(record, timestamp);
+ final StampedRecord stampedRecord = new StampedRecord(record, timestamp);
fifoQueue.addLast(stampedRecord);
timeTracker.addElement(stampedRecord);
}
@@ -111,10 +108,11 @@ public class RecordQueue {
// update the partition timestamp if its currently
// tracked min timestamp has exceed its value; this will
// usually only take effect for the first added batch
- long timestamp = timeTracker.get();
+ final long timestamp = timeTracker.get();
- if (timestamp > partitionTime)
+ if (timestamp > partitionTime) {
partitionTime = timestamp;
+ }
return size();
}
@@ -125,19 +123,21 @@ public class RecordQueue {
* @return StampedRecord
*/
public StampedRecord poll() {
- StampedRecord elem = fifoQueue.pollFirst();
+ final StampedRecord elem = fifoQueue.pollFirst();
- if (elem == null)
+ if (elem == null) {
return null;
+ }
timeTracker.removeElement(elem);
// only advance the partition timestamp if its currently
// tracked min timestamp has exceeded its value
- long timestamp = timeTracker.get();
+ final long timestamp = timeTracker.get();
- if (timestamp > partitionTime)
+ if (timestamp > partitionTime) {
partitionTime = timestamp;
+ }
return elem;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index 1d9e722..7fde881 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -63,12 +63,12 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
}
public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
- ConsumerRecord<byte[], byte[]> rawRecord) {
+ final ConsumerRecord<byte[], byte[]> rawRecord) {
// catch and process if we have a deserialization handler
try {
return deserialize(rawRecord);
- } catch (Exception e) {
+ } catch (final Exception e) {
final DeserializationExceptionHandler.DeserializationHandlerResponse response =
deserializationExceptionHandler.handle(processorContext, rawRecord, e);
if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 033af24..98ec810 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -63,6 +63,15 @@ public class StandbyTask extends AbstractTask {
processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
}
+ @Override
+ public boolean initialize() {
+ initializeStateStores();
+ checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
+ processorContext.initialized();
+ taskInitialized = true;
+ return true;
+ }
+
/**
* <pre>
* - update offset limits
@@ -139,16 +148,6 @@ public class StandbyTask extends AbstractTask {
}
@Override
- public boolean maybePunctuateStreamTime() {
- throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
- }
-
- @Override
- public boolean maybePunctuateSystemTime() {
- throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
- }
-
- @Override
public boolean commitNeeded() {
return false;
}
@@ -174,16 +173,18 @@ public class StandbyTask extends AbstractTask {
}
@Override
- public boolean process() {
- throw new UnsupportedOperationException("process not supported by StandbyTasks");
+ public boolean maybePunctuateStreamTime() {
+ throw new UnsupportedOperationException("maybePunctuateStreamTime not supported by StandbyTask");
}
- public boolean initialize() {
- initializeStateStores();
- checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
- processorContext.initialized();
- taskInitialized = true;
- return true;
+ @Override
+ public boolean maybePunctuateSystemTime() {
+ throw new UnsupportedOperationException("maybePunctuateSystemTime not supported by StandbyTask");
+ }
+
+ @Override
+ public boolean process() {
+ throw new UnsupportedOperationException("process not supported by StandbyTasks");
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 8ecc7e2..caa0100 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -185,7 +185,9 @@ public class StoreChangelogReader implements ChangelogReader {
needsRestoring.putAll(initialized);
}
- private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) {
+ private void logRestoreOffsets(final TopicPartition partition,
+ final long startingOffset,
+ final Long endOffset) {
log.debug("Restoring partition {} from offset {} to endOffset {}",
partition,
startingOffset,
@@ -229,7 +231,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
- final TopicPartition topicPartition) {
+ final TopicPartition topicPartition) {
final StateRestorer restorer = stateRestorers.get(topicPartition);
final Long endOffset = endOffsets.get(topicPartition);
final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
@@ -255,7 +257,8 @@ public class StoreChangelogReader implements ChangelogReader {
}
private long processNext(final List<ConsumerRecord<byte[], byte[]>> records,
- final StateRestorer restorer, final Long endOffset) {
+ final StateRestorer restorer,
+ final Long endOffset) {
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
long nextPosition = -1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 084a991..0830aa2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -131,7 +131,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// initialize the topology with its own context
processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
- final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
+ final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
for (final TopicPartition partition : partitions) {
final SourceNode source = topology.source(partition.topic());
@@ -151,6 +151,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
+ public boolean initialize() {
+ log.debug("Initializing");
+ initializeStateStores();
+ initTopology();
+ processorContext.initialized();
+ taskInitialized = true;
+ return topology.stateStores().isEmpty();
+ }
+
+
/**
* <pre>
* - re-initialize the task
@@ -597,13 +607,4 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return new RecordCollectorImpl(producer, id.toString(), logContext);
}
- public boolean initialize() {
- log.debug("Initializing");
- initializeStateStores();
- initTopology();
- processorContext.initialized();
- taskInitialized = true;
- return topology.stateStores().isEmpty();
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 867359b..e141c46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -421,7 +421,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
return threadProducer;
-
}
@Override
@@ -456,20 +455,29 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
@Override
- StandbyTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
+ StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
+ final TaskId taskId,
+ final Set<TopicPartition> partitions) {
taskCreatedSensor.record();
final ProcessorTopology topology = builder.build(taskId.topicGroupId);
if (!topology.stateStores().isEmpty()) {
- return new StandbyTask(taskId, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
+ return new StandbyTask(taskId,
+ applicationId,
+ partitions,
+ topology,
+ consumer,
+ storeChangelogReader,
+ config,
+ streamsMetrics,
+ stateDirectory);
} else {
- log.trace("Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", taskId, partitions);
-
+ log.trace("Skipped standby task {} with assigned partitions {} " +
+ "since it does not have any state stores to materialize", taskId, partitions);
return null;
}
}
-
}
/**
@@ -781,8 +789,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
final long processLatency = computeLatency();
streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
timerStartedMs);
- processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit, totalProcessed,
- processLatency, commitTimeMs);
+ processedBeforeCommit = adjustRecordsProcessedBeforeCommit(recordsProcessedBeforeCommit,
+ totalProcessed,
+ processLatency,
+ commitTimeMs);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd0146d9/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7afbecf..f12ed91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -110,7 +110,7 @@ class TaskManager {
newTasks.put(taskId, partitions);
}
} catch (final StreamsException e) {
- log.error("Failed to create an active task {} due to the following error:", taskId, e);
+ log.error("Failed to resume an active task {} due to the following error:", taskId, e);
throw e;
}
} else {
@@ -122,6 +122,7 @@ class TaskManager {
return;
}
+ // CANNOT FIND RETRY AND BACKOFF LOGIC
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
log.trace("New active tasks to be created: {}", newTasks);
@@ -185,24 +186,13 @@ class TaskManager {
firstException.compareAndSet(null, active.suspend());
firstException.compareAndSet(null, standby.suspend());
// remove the changelog partitions from restore consumer
- firstException.compareAndSet(null, unAssignChangeLogPartitions());
+ restoreConsumer.assign(Collections.<TopicPartition>emptyList());
if (firstException.get() != null) {
throw new StreamsException(logPrefix + "failed to suspend stream tasks", firstException.get());
}
}
- private RuntimeException unAssignChangeLogPartitions() {
- try {
- // un-assign the change log partitions
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
- } catch (final RuntimeException e) {
- log.error("Failed to un-assign change log partitions due to the following error:", e);
- return e;
- }
- return null;
- }
-
void shutdown(final boolean clean) {
log.debug("Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", active.runningTaskIds(), standby.runningTaskIds(),
active.previousTaskIds(), standby.previousTaskIds());
@@ -215,9 +205,9 @@ class TaskManager {
log.error("Failed to close KafkaStreamClient due to the following error:", e);
}
// remove the changelog partitions from restore consumer
- unAssignChangeLogPartitions();
+ restoreConsumer.assign(Collections.<TopicPartition>emptyList());
taskCreator.close();
-
+ standbyTaskCreator.close();
}
Set<TaskId> suspendedActiveTaskIds() {