You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/30 20:13:28 UTC
kafka git commit: MINOR: Code cleanup and JavaDoc improvements for
clients and Streams
Repository: kafka
Updated Branches:
refs/heads/trunk 7fe88e8bd -> c7ab3efcb
MINOR: Code cleanup and JavaDoc improvements for clients and Streams
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4128 from mjsax/minor-cleanup
minor fix
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c7ab3efc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c7ab3efc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c7ab3efc
Branch: refs/heads/trunk
Commit: c7ab3efcbe5d34c28e19a5a6a59962c2abfd2235
Parents: 7fe88e8
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Mon Oct 30 13:11:18 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 30 13:13:19 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/admin/KafkaAdminClient.java | 14 +-
.../kafka/clients/consumer/KafkaConsumer.java | 13 +-
.../kafka/clients/producer/KafkaProducer.java | 9 +-
.../org/apache/kafka/streams/KafkaStreams.java | 8 +-
.../internals/GlobalStateManagerImpl.java | 22 +--
.../processor/internals/GlobalStreamThread.java | 30 ++--
.../internals/InternalTopicManager.java | 7 +-
.../internals/StoreChangelogReader.java | 37 ++---
.../processor/internals/StreamsKafkaClient.java | 86 ++++++++----
.../processor/internals/TaskManager.java | 5 +-
.../apache/kafka/streams/state/HostInfo.java | 20 +--
.../internals/InternalTopicManagerTest.java | 136 ++++++++++++++-----
.../internals/StoreChangelogReaderTest.java | 12 +-
.../internals/StreamsKafkaClientTest.java | 6 +-
.../processor/internals/TaskManagerTest.java | 8 +-
.../tests/streams/streams_broker_bounce_test.py | 8 +-
16 files changed, 263 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 74ecc81..1945f50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -17,7 +17,6 @@
package org.apache.kafka.clients.admin;
-import java.util.Set;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
@@ -65,13 +64,13 @@ import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
-import org.apache.kafka.common.requests.CreatePartitionsRequest;
-import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsRequest;
import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse;
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
@@ -108,6 +107,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -953,22 +953,22 @@ public class KafkaAdminClient extends AdminClient {
@Override
public void run() {
- /**
+ /*
* Maps nodes to calls that we want to send.
*/
Map<Node, List<Call>> callsToSend = new HashMap<>();
- /**
+ /*
* Maps node ID strings to calls that have been sent.
*/
Map<String, List<Call>> callsInFlight = new HashMap<>();
- /**
+ /*
* Maps correlation IDs to calls that have been sent.
*/
Map<Integer, Call> correlationIdToCalls = new HashMap<>();
- /**
+ /*
* The previous metadata version which wasn't usable, or null if there is none.
*/
Integer prevMetadataVersion = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c676863..05bca22 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1388,6 +1388,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+ * This method may issue a remote call to the server if there is no current position for the given partition.
+ * <p>
+ * This call will block until either the position could be determined or an unrecoverable error is
+ * encountered (in which case it is thrown to the caller).
*
* @param partition The partition to get the position for
* @return The offset
@@ -1583,6 +1587,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative.
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured request timeout
@@ -1617,6 +1622,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param partitions the partitions to get the earliest offsets.
* @return The earliest available offsets for the given partitions
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured request timeout
*/
@@ -1646,6 +1652,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param partitions the partitions to get the end offsets.
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured request timeout
*/
@@ -1666,9 +1673,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()}
* cannot be used to interrupt close.
*
- * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
* before or while this function is called
+ * @throws org.apache.kafka.common.KafkaException for any other error during close
*/
@Override
public void close() {
@@ -1686,9 +1693,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
* @param timeUnit The time unit for the {@code timeout}
- * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
- * @throws InterruptException If the thread is interrupted before or while this function is called
* @throws IllegalArgumentException If the {@code timeout} is negative.
+ * @throws InterruptException If the thread is interrupted before or while this function is called
+ * @throws org.apache.kafka.common.KafkaException for any other error during close
*/
public void close(long timeout, TimeUnit timeUnit) {
if (timeout < 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3185786..8004180 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -37,6 +37,8 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -745,7 +747,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
*
- * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws AuthenticationException if authentication fails. See the exception for more details
+ * @throws AuthorizationException fatal error indicating that the producer is not allowed to write
* @throws IllegalStateException if a transactional.id has been configured and no transaction has been started
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid objects given the configured serializers
@@ -968,8 +971,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
/**
* Get the partition metadata for the given topic. This can be used for custom partitioning.
- * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws AuthenticationException if authentication fails. See the exception for more details
+ * @throws AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws InterruptException If the thread is interrupted while blocked
+ * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6e48f19..9ad02ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -61,7 +61,6 @@ import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -711,12 +710,7 @@ public class KafkaStreams {
client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
- try {
- client.close();
- } catch (final IOException e) {
- log.warn("Could not close StreamKafkaClient.", e);
- }
-
+ client.close();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 10a0775..53dd51c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -55,7 +55,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private static final Logger log = LoggerFactory.getLogger(GlobalStateManagerImpl.class);
private final ProcessorTopology topology;
- private final Consumer<byte[], byte[]> consumer;
+ private final Consumer<byte[], byte[]> globalConsumer;
private final StateDirectory stateDirectory;
private final Map<String, StateStore> stores = new LinkedHashMap<>();
private final File baseDir;
@@ -65,11 +65,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final StateRestoreListener stateRestoreListener;
public GlobalStateManagerImpl(final ProcessorTopology topology,
- final Consumer<byte[], byte[]> consumer,
+ final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener) {
this.topology = topology;
- this.consumer = consumer;
+ this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
this.baseDir = stateDirectory.globalStateDir();
this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
@@ -136,19 +136,19 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
log.info("Restoring state for global store {}", store.name());
final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
- final Map<TopicPartition, Long> highWatermarks = consumer.endOffsets(topicPartitions);
+ final Map<TopicPartition, Long> highWatermarks = globalConsumer.endOffsets(topicPartitions);
try {
restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name());
stores.put(store.name(), store);
} finally {
- consumer.assign(Collections.<TopicPartition>emptyList());
+ globalConsumer.unsubscribe();
}
}
private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
final String sourceTopic = topology.storeToChangelogTopic().get(store.name());
- final List<PartitionInfo> partitionInfos = consumer.partitionsFor(sourceTopic);
+ final List<PartitionInfo> partitionInfos = globalConsumer.partitionsFor(sourceTopic);
if (partitionInfos == null || partitionInfos.isEmpty()) {
throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
}
@@ -165,15 +165,15 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
final Map<TopicPartition, Long> highWatermarks,
final String storeName) {
for (final TopicPartition topicPartition : topicPartitions) {
- consumer.assign(Collections.singletonList(topicPartition));
+ globalConsumer.assign(Collections.singletonList(topicPartition));
final Long checkpoint = checkpointableOffsets.get(topicPartition);
if (checkpoint != null) {
- consumer.seek(topicPartition, checkpoint);
+ globalConsumer.seek(topicPartition, checkpoint);
} else {
- consumer.seekToBeginning(Collections.singletonList(topicPartition));
+ globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
}
- long offset = consumer.position(topicPartition);
+ long offset = globalConsumer.position(topicPartition);
final Long highWatermark = highWatermarks.get(topicPartition);
BatchingStateRestoreCallback
stateRestoreAdapter =
@@ -186,7 +186,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
long restoreCount = 0L;
while (offset < highWatermark) {
- final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+ final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
offset = record.offset() + 1;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 1ee49e1..f3800fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -50,7 +50,7 @@ public class GlobalStreamThread extends Thread {
private final Logger log;
private final LogContext logContext;
private final StreamsConfig config;
- private final Consumer<byte[], byte[]> consumer;
+ private final Consumer<byte[], byte[]> globalConsumer;
private final StateDirectory stateDirectory;
private final Time time;
private final ThreadCache cache;
@@ -183,7 +183,7 @@ public class GlobalStreamThread extends Thread {
this.time = time;
this.config = config;
this.topology = topology;
- this.consumer = globalConsumer;
+ this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
(config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
@@ -197,7 +197,7 @@ public class GlobalStreamThread extends Thread {
}
static class StateConsumer {
- private final Consumer<byte[], byte[]> consumer;
+ private final Consumer<byte[], byte[]> globalConsumer;
private final GlobalStateMaintainer stateMaintainer;
private final Time time;
private final long pollMs;
@@ -207,13 +207,13 @@ public class GlobalStreamThread extends Thread {
private long lastFlush;
StateConsumer(final LogContext logContext,
- final Consumer<byte[], byte[]> consumer,
+ final Consumer<byte[], byte[]> globalConsumer,
final GlobalStateMaintainer stateMaintainer,
final Time time,
final long pollMs,
final long flushInterval) {
this.log = logContext.logger(getClass());
- this.consumer = consumer;
+ this.globalConsumer = globalConsumer;
this.stateMaintainer = stateMaintainer;
this.time = time;
this.pollMs = pollMs;
@@ -226,15 +226,15 @@ public class GlobalStreamThread extends Thread {
*/
void initialize() {
final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize();
- consumer.assign(partitionOffsets.keySet());
+ globalConsumer.assign(partitionOffsets.keySet());
for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
- consumer.seek(entry.getKey(), entry.getValue());
+ globalConsumer.seek(entry.getKey(), entry.getValue());
}
lastFlush = time.milliseconds();
}
void pollAndUpdate() {
- final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs);
+ final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
for (ConsumerRecord<byte[], byte[]> record : received) {
stateMaintainer.update(record);
}
@@ -247,8 +247,8 @@ public class GlobalStreamThread extends Thread {
public void close() throws IOException {
try {
- consumer.close();
- } catch (Exception e) {
+ globalConsumer.close();
+ } catch (final RuntimeException e) {
// just log an error if the consumer throws an exception during close
// so we can always attempt to close the state stores.
log.error("Failed to close consumer due to the following error:", e);
@@ -291,7 +291,7 @@ public class GlobalStreamThread extends Thread {
try {
stateConsumer.close();
- } catch (IOException e) {
+ } catch (final IOException e) {
log.error("Failed to close state maintainer due to the following error:", e);
}
setState(DEAD);
@@ -303,12 +303,12 @@ public class GlobalStreamThread extends Thread {
private StateConsumer initialize() {
try {
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology,
- consumer,
+ globalConsumer,
stateDirectory,
stateRestoreListener);
final StateConsumer stateConsumer
= new StateConsumer(this.logContext,
- consumer,
+ globalConsumer,
new GlobalStateUpdateTask(topology,
new GlobalProcessorContextImpl(
config,
@@ -323,9 +323,9 @@ public class GlobalStreamThread extends Thread {
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
stateConsumer.initialize();
return stateConsumer;
- } catch (StreamsException e) {
+ } catch (final StreamsException e) {
startupException = e;
- } catch (Exception e) {
+ } catch (final Exception e) {
startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e);
}
return null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a038d09..ae2b375 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.slf4j.Logger;
-import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -110,11 +109,7 @@ public class InternalTopicManager {
}
public void close() {
- try {
- streamsKafkaClient.close();
- } catch (IOException e) {
- log.warn("Could not close StreamsKafkaClient.");
- }
+ streamsKafkaClient.close();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index bbe570c..83c783d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
@@ -41,7 +42,7 @@ import java.util.Set;
public class StoreChangelogReader implements ChangelogReader {
private final Logger log;
- private final Consumer<byte[], byte[]> consumer;
+ private final Consumer<byte[], byte[]> restoreConsumer;
private final StateRestoreListener userStateRestoreListener;
private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
@@ -49,10 +50,10 @@ public class StoreChangelogReader implements ChangelogReader {
private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
- public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
+ public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
final StateRestoreListener userStateRestoreListener,
final LogContext logContext) {
- this.consumer = consumer;
+ this.restoreConsumer = restoreConsumer;
this.log = logContext.logger(getClass());
this.userStateRestoreListener = userStateRestoreListener;
}
@@ -73,26 +74,26 @@ public class StoreChangelogReader implements ChangelogReader {
}
if (needsRestoring.isEmpty()) {
- consumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
return completed();
}
final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
- final ConsumerRecords<byte[], byte[]> allRecords = consumer.poll(10);
+ final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
for (final TopicPartition partition : partitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
}
if (needsRestoring.isEmpty()) {
- consumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
}
return completed();
}
private void initialize() {
- if (!consumer.subscription().isEmpty()) {
- throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")");
+ if (!restoreConsumer.subscription().isEmpty()) {
+ throw new StreamsException("Restore consumer should not be subscribed to any topics (" + restoreConsumer.subscription() + ")");
}
// first refresh the changelog partition information from brokers, since initialize is only called when
@@ -110,7 +111,7 @@ public class StoreChangelogReader implements ChangelogReader {
// try to fetch end offsets for the initializable restorers and remove any partitions
// where we already have all of the data
try {
- endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
+ endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
} catch (final TimeoutException e) {
// if timeout exception gets thrown we just give up this time and retry in the next run loop
log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
@@ -151,27 +152,27 @@ public class StoreChangelogReader implements ChangelogReader {
private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
- final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
+ final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
assignment.addAll(initialized.keySet());
- consumer.assign(assignment);
+ restoreConsumer.assign(assignment);
final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
for (final StateRestorer restorer : initialized.values()) {
if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
- consumer.seek(restorer.partition(), restorer.checkpoint());
+ restoreConsumer.seek(restorer.partition(), restorer.checkpoint());
logRestoreOffsets(restorer.partition(),
restorer.checkpoint(),
endOffsets.get(restorer.partition()));
- restorer.setStartingOffset(consumer.position(restorer.partition()));
+ restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
restorer.restoreStarted();
} else {
- consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+ restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
needsPositionUpdate.add(restorer);
}
}
for (final StateRestorer restorer : needsPositionUpdate) {
- final long position = consumer.position(restorer.partition());
+ final long position = restoreConsumer.position(restorer.partition());
logRestoreOffsets(restorer.partition(),
position,
endOffsets.get(restorer.partition()));
@@ -200,7 +201,7 @@ public class StoreChangelogReader implements ChangelogReader {
private void refreshChangelogInfo() {
try {
- partitionInfo.putAll(consumer.listTopics());
+ partitionInfo.putAll(restoreConsumer.listTopics());
} catch (final TimeoutException e) {
log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
}
@@ -270,7 +271,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
if (offset == -1) {
- offset = consumer.position(restorer.partition());
+ offset = restoreConsumer.position(restorer.partition());
}
if (!restoreRecords.isEmpty()) {
@@ -278,7 +279,7 @@ public class StoreChangelogReader implements ChangelogReader {
restorer.restoreBatchCompleted(offset + 1, records.size());
}
- return consumer.position(restorer.partition());
+ return restoreConsumer.position(restorer.partition());
}
private boolean hasPartition(final TopicPartition topicPartition) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 1e99ad2..075f445 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -51,6 +52,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.BrokerNotFoundException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -85,16 +87,18 @@ public class StreamsKafkaClient {
private final KafkaClient kafkaClient;
private final List<MetricsReporter> reporters;
private final Config streamsConfig;
+ private final Logger log;
private final Map<String, String> defaultTopicConfigs = new HashMap<>();
private static final int MAX_INFLIGHT_REQUESTS = 100;
-
StreamsKafkaClient(final Config streamsConfig,
final KafkaClient kafkaClient,
- final List<MetricsReporter> reporters) {
+ final List<MetricsReporter> reporters,
+ final LogContext log) {
this.streamsConfig = streamsConfig;
this.kafkaClient = kafkaClient;
this.reporters = reporters;
+ this.log = log.logger(StreamsKafkaClient.class);
extractDefaultTopicConfigs(streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX));
}
@@ -114,17 +118,18 @@ public class StreamsKafkaClient {
final String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
metricTags.put("client-id", clientId);
- final Metadata metadata = new Metadata(streamsConfig.getLong(
- StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
- streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG), false);
+ final Metadata metadata = new Metadata(streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
+ streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG),
+ false);
final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
- final List<MetricsReporter> reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ final List<MetricsReporter> reporters = streamsConfig.getConfiguredInstances(
+ ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
// TODO: This should come from the KafkaStream
reporters.add(new JmxReporter("kafka.admin.client"));
final Metrics metrics = new Metrics(metricConfig, reporters, time);
@@ -154,7 +159,7 @@ public class StreamsKafkaClient {
true,
new ApiVersions(),
logContext);
- return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters);
+ return new StreamsKafkaClient(streamsConfig, kafkaClient, reporters, logContext);
}
private static LogContext createLogContext(String clientId) {
@@ -165,9 +170,13 @@ public class StreamsKafkaClient {
return create(Config.fromStreamsConfig(streamsConfig));
}
- public void close() throws IOException {
+ public void close() {
try {
kafkaClient.close();
+ } catch (final IOException impossible) {
+ // this can actually never happen, because NetworkClient doesn't throw any exception on close()
+ // we log just in case
+ log.error("This error indicates a bug in the code. Please report to dev@kafka.apache.org.", impossible);
} finally {
for (MetricsReporter metricsReporter: this.reporters) {
metricsReporter.close();
@@ -177,17 +186,22 @@ public class StreamsKafkaClient {
/**
* Create a set of new topics using batch request.
+ *
+ * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
+ * @throws TimeoutException if there was no response within {@code request.timeout.ms}
+ * @throws StreamsException for any other fatal error
*/
- public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor,
- final long windowChangeLogAdditionalRetention, final MetadataResponse metadata) {
-
+ public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
+ final int replicationFactor,
+ final long windowChangeLogAdditionalRetention,
+ final MetadataResponse metadata) {
final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new HashMap<>();
- for (Map.Entry<InternalTopicConfig, Integer> entry : topicsMap.entrySet()) {
- InternalTopicConfig internalTopicConfig = entry.getKey();
- Integer partitions = entry.getValue();
+ for (final Map.Entry<InternalTopicConfig, Integer> entry : topicsMap.entrySet()) {
+ final InternalTopicConfig internalTopicConfig = entry.getKey();
+ final Integer partitions = entry.getValue();
final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
final Map<String, String> topicConfig = new HashMap<>(defaultTopicConfigs);
- for (String key : topicProperties.stringPropertyNames()) {
+ for (final String key : topicProperties.stringPropertyNames()) {
topicConfig.put(key, topicProperties.getProperty(key));
}
final CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(
@@ -205,7 +219,7 @@ public class StreamsKafkaClient {
streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG)),
Time.SYSTEM.milliseconds(),
true);
- final ClientResponse clientResponse = sendRequest(clientRequest);
+ final ClientResponse clientResponse = sendRequestSync(clientRequest);
if (!clientResponse.hasResponse()) {
throw new StreamsException("Empty response for client request.");
@@ -228,6 +242,7 @@ public class StreamsKafkaClient {
*
* @param nodes List of nodes to pick from.
* @return The first node that is ready to accept requests.
+ * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
*/
private String ensureOneNodeIsReady(final List<Node> nodes) {
String brokerId = null;
@@ -243,7 +258,7 @@ public class StreamsKafkaClient {
}
try {
kafkaClient.poll(50, Time.SYSTEM.milliseconds());
- } catch (final Exception e) {
+ } catch (final RuntimeException e) {
throw new StreamsException("Could not poll.", e);
}
}
@@ -257,9 +272,9 @@ public class StreamsKafkaClient {
}
/**
- *
* @return if Id of the controller node, or an exception if no controller is found or
* controller is not ready
+ * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
*/
private String getControllerReadyBrokerId(final MetadataResponse metadata) {
return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
@@ -267,6 +282,7 @@ public class StreamsKafkaClient {
/**
* @return the Id of any broker that is ready, or an exception if no broker is ready.
+ * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
*/
private String getAnyReadyBrokerId() {
final Metadata metadata = new Metadata(
@@ -280,26 +296,32 @@ public class StreamsKafkaClient {
return ensureOneNodeIsReady(nodes);
}
- private ClientResponse sendRequest(final ClientRequest clientRequest) {
+ /**
+ * @return the response to the request
+ * @throws TimeoutException if there was no response within {@code request.timeout.ms}
+ * @throws StreamsException any other fatal error
+ */
+ private ClientResponse sendRequestSync(final ClientRequest clientRequest) {
try {
kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
- } catch (final Exception e) {
+ } catch (final RuntimeException e) {
throw new StreamsException("Could not send request.", e);
}
- final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
+
// Poll for the response.
+ final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
while (Time.SYSTEM.milliseconds() < responseTimeout) {
final List<ClientResponse> responseList;
try {
responseList = kafkaClient.poll(100, Time.SYSTEM.milliseconds());
- } catch (final IllegalStateException e) {
+ } catch (final RuntimeException e) {
throw new StreamsException("Could not poll.", e);
}
if (!responseList.isEmpty()) {
if (responseList.size() > 1) {
throw new StreamsException("Sent one request but received multiple or no responses.");
}
- ClientResponse response = responseList.get(0);
+ final ClientResponse response = responseList.get(0);
if (response.requestHeader().correlationId() == clientRequest.correlationId()) {
return response;
} else {
@@ -309,21 +331,24 @@ public class StreamsKafkaClient {
}
}
}
- throw new StreamsException("Failed to get response from broker within timeout");
+ throw new TimeoutException("Failed to get response from broker within timeout");
}
/**
- * Fetch the metadata for all topics
+ * Fetch the metadata for all topics.
+ *
+ * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
+ * @throws TimeoutException if there was no response within {@code request.timeout.ms}
+ * @throws StreamsException for any other fatal error
*/
public MetadataResponse fetchMetadata() {
-
final ClientRequest clientRequest = kafkaClient.newClientRequest(
getAnyReadyBrokerId(),
MetadataRequest.Builder.allTopics(),
Time.SYSTEM.milliseconds(),
true);
- final ClientResponse clientResponse = sendRequest(clientRequest);
+ final ClientResponse clientResponse = sendRequestSync(clientRequest);
if (!clientResponse.hasResponse()) {
throw new StreamsException("Empty response for client request.");
@@ -342,7 +367,10 @@ public class StreamsKafkaClient {
* Note, for <em>pre</em> 0.10.x brokers the broker version cannot be checked and the client will hang and retry
* until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
*
+ * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
+ * @throws TimeoutException if there was no response within {@code request.timeout.ms}
* @throws StreamsException if brokers have version 0.10.0.x
+ * @throws StreamsException for any other fatal error
*/
public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
final ClientRequest clientRequest = kafkaClient.newClientRequest(
@@ -351,7 +379,7 @@ public class StreamsKafkaClient {
Time.SYSTEM.milliseconds(),
true);
- final ClientResponse clientResponse = sendRequest(clientRequest);
+ final ClientResponse clientResponse = sendRequestSync(clientRequest);
if (!clientResponse.hasResponse()) {
throw new StreamsException("Empty response for client request.");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 8dc477f..6f4cc51 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -197,7 +196,7 @@ class TaskManager {
firstException.compareAndSet(null, active.suspend());
firstException.compareAndSet(null, standby.suspend());
// remove the changelog partitions from restore consumer
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
final Exception exception = firstException.get();
if (exception != null) {
@@ -223,7 +222,7 @@ class TaskManager {
log.error("Failed to close KafkaStreamClient due to the following error:", e);
}
// remove the changelog partitions from restore consumer
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
taskCreator.close();
standbyTaskCreator.close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
index 8fcdd03..c1b1021 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/HostInfo.java
@@ -40,21 +40,23 @@ public class HostInfo {
private final String host;
private final int port;
- public HostInfo(String host, int port) {
+ public HostInfo(final String host,
+ final int port) {
this.host = host;
this.port = port;
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- HostInfo hostInfo = (HostInfo) o;
-
- if (port != hostInfo.port) return false;
- return host.equals(hostInfo.host);
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final HostInfo hostInfo = (HostInfo) o;
+ return port == hostInfo.port && host.equals(hostInfo.host);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 1cd6bee..7d032a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -18,8 +18,10 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@@ -33,10 +35,12 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
+import static org.junit.Assert.assertEquals;
public class InternalTopicManagerTest {
@@ -44,6 +48,7 @@ public class InternalTopicManagerTest {
private final String userEndPoint = "localhost:2171";
private MockStreamKafkaClient streamsKafkaClient;
private final Time time = new MockTime();
+
@Before
public void init() {
final StreamsConfig config = new StreamsConfig(configProps());
@@ -57,54 +62,83 @@ public class InternalTopicManagerTest {
@Test
public void shouldReturnCorrectPartitionCounts() {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
- WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
- Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
+ final InternalTopicManager internalTopicManager = new InternalTopicManager(
+ streamsKafkaClient,
+ 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+ time);
+ assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
}
@Test
public void shouldCreateRequiredTopics() {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
- WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
- internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
+ streamsKafkaClient.returnNoMetadata = true;
+
+ final InternalTopicManager internalTopicManager = new InternalTopicManager(
+ streamsKafkaClient,
+ 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+ time);
+
+ final InternalTopicConfig topicConfig = new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null);
+ internalTopicManager.makeReady(Collections.singletonMap(topicConfig, 1));
+
+ assertEquals(Collections.singletonMap(topic, topicConfig), streamsKafkaClient.createdTopics);
+ assertEquals(Collections.singletonMap(topic, 1), streamsKafkaClient.numberOfPartitionsPerTopic);
+ assertEquals(Collections.singletonMap(topic, 1), streamsKafkaClient.replicationFactorPerTopic);
}
@Test
public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
- WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
- boolean exceptionWasThrown = false;
+ final InternalTopicManager internalTopicManager = new InternalTopicManager(
+ streamsKafkaClient,
+ 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+ time);
try {
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2));
- } catch (StreamsException e) {
- exceptionWasThrown = true;
- }
- Assert.assertTrue(exceptionWasThrown);
+ Assert.fail("Should have thrown StreamsException");
+ } catch (StreamsException expected) { /* pass */ }
}
@Test
public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
// create topic the first time with replication 2
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 2,
- WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
- internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
+ final InternalTopicManager internalTopicManager = new InternalTopicManager(
+ streamsKafkaClient,
+ 2,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+ time);
+ internalTopicManager.makeReady(Collections.singletonMap(
+ new InternalTopicConfig(topic,
+ Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+ null),
+ 1));
// attempt to create it again with replication 1
- InternalTopicManager internalTopicManager2 = new InternalTopicManager(streamsKafkaClient, 1,
- WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
- try {
- internalTopicManager2.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
- } catch (StreamsException e) {
- Assert.fail("did not expect an exception since topic is already there.");
- }
+ final InternalTopicManager internalTopicManager2 = new InternalTopicManager(
+ streamsKafkaClient,
+ 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+ time);
+
+ internalTopicManager2.makeReady(Collections.singletonMap(
+ new InternalTopicConfig(topic,
+ Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
+ null),
+ 1));
}
@Test
public void shouldNotThrowExceptionForEmptyTopicMap() {
- InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
- WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
- internalTopicManager.makeReady(Collections.EMPTY_MAP);
+ final InternalTopicManager internalTopicManager = new InternalTopicManager(
+ streamsKafkaClient,
+ 1,
+ WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT,
+ time);
+
+ internalTopicManager.makeReady(Collections.<InternalTopicConfig, Integer>emptyMap());
}
private Properties configProps() {
@@ -120,24 +154,54 @@ public class InternalTopicManagerTest {
private class MockStreamKafkaClient extends StreamsKafkaClient {
+ boolean returnNoMetadata = false;
+
+ Map<String, InternalTopicConfig> createdTopics = new HashMap<>();
+ Map<String, Integer> numberOfPartitionsPerTopic = new HashMap<>();
+ Map<String, Integer> replicationFactorPerTopic = new HashMap<>();
+
MockStreamKafkaClient(final StreamsConfig streamsConfig) {
- super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig), new MockClient(new MockTime()), Collections.EMPTY_LIST);
+ super(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
+ new MockClient(new MockTime()),
+ Collections.<MetricsReporter>emptyList(),
+ new LogContext());
}
@Override
- public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor,
- final long windowChangeLogAdditionalRetention, final MetadataResponse metadata) {
- // do nothing
+ public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
+ final int replicationFactor,
+ final long windowChangeLogAdditionalRetention,
+ final MetadataResponse metadata) {
+ for (final Map.Entry<InternalTopicConfig, Integer> topic : topicsMap.entrySet()) {
+ final InternalTopicConfig config = topic.getKey();
+ final String topicName = config.name();
+ createdTopics.put(topicName, config);
+ numberOfPartitionsPerTopic.put(topicName, topic.getValue());
+ replicationFactorPerTopic.put(topicName, replicationFactor);
+ }
}
@Override
public MetadataResponse fetchMetadata() {
- Node node = new Node(1, "host1", 1001);
- MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>(), new ArrayList<Node>());
- MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
- MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID,
- Collections.singletonList(topicMetadata));
- return response;
+ final Node node = new Node(1, "host1", 1001);
+ final MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>(), new ArrayList<Node>());
+ final MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata));
+ final MetadataResponse metadataResponse;
+ if (returnNoMetadata) {
+ metadataResponse = new MetadataResponse(
+ Collections.<Node>singletonList(node),
+ null,
+ MetadataResponse.NO_CONTROLLER_ID,
+ Collections.<MetadataResponse.TopicMetadata>emptyList());
+ } else {
+ metadataResponse = new MetadataResponse(
+ Collections.<Node>singletonList(node),
+ null,
+ MetadataResponse.NO_CONTROLLER_ID,
+ Collections.singletonList(topicMetadata));
+ }
+
+ return metadataResponse;
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 3c54851..705bcf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -25,10 +25,12 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
+import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@@ -94,11 +96,19 @@ public class StoreChangelogReaderTest {
@Test
public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
+ final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
+ mockRestorer.setUserRestoreListener(stateRestoreListener);
+ expect(mockRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new TopicPartition("sometopic", 0));
+ EasyMock.replay(mockRestorer);
+ changelogReader.register(mockRestorer);
+
+
consumer.subscribe(Collections.singleton("sometopic"));
+
try {
changelogReader.restore(active);
fail("Should have thrown IllegalStateException");
- } catch (final IllegalStateException e) {
+ } catch (final StreamsException expected) {
// ok
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
index 0bb7682..a399dd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -214,8 +215,9 @@ public class StreamsKafkaClientTest {
private StreamsKafkaClient createStreamsKafkaClient() {
final StreamsConfig streamsConfig = new StreamsConfig(config);
return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig),
- kafkaClient,
- reporters);
+ kafkaClient,
+ reporters,
+ new LogContext());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 67dd6c0..b11b8c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -218,7 +218,7 @@ public class TaskManagerTest {
@Test
public void shouldUnassignChangelogPartitionsOnSuspend() {
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
EasyMock.expectLastCall();
replay();
@@ -231,7 +231,7 @@ public class TaskManagerTest {
EasyMock.expect(active.suspend()).andReturn(new RuntimeException(""));
EasyMock.expect(standby.suspend()).andReturn(new RuntimeException(""));
EasyMock.expectLastCall();
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
replay();
try {
@@ -265,7 +265,7 @@ public class TaskManagerTest {
@Test
public void shouldUnassignChangelogPartitionsOnShutdown() {
- restoreConsumer.assign(Collections.<TopicPartition>emptyList());
+ restoreConsumer.unsubscribe();
EasyMock.expectLastCall();
replay();
@@ -483,8 +483,6 @@ public class TaskManagerTest {
EasyMock.replay(task);
}
-
-
private void mockStandbyTaskExpectations() {
mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/c7ab3efc/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index 67d5877..15d67b9 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -196,7 +196,7 @@ class StreamsBrokerBounceTest(Test):
time.sleep(sleep_time_secs)
# Fail brokers
- self.fail_broker_type(failure_mode, broker_type);
+ self.fail_broker_type(failure_mode, broker_type)
return self.collect_results(sleep_time_secs)
@@ -216,7 +216,7 @@ class StreamsBrokerBounceTest(Test):
time.sleep(sleep_time_secs)
# Fail brokers
- self.fail_broker_type(failure_mode, broker_type);
+ self.fail_broker_type(failure_mode, broker_type)
return self.collect_results(sleep_time_secs)
@@ -234,7 +234,7 @@ class StreamsBrokerBounceTest(Test):
time.sleep(120)
# Fail brokers
- self.fail_many_brokers(failure_mode, num_failures);
+ self.fail_many_brokers(failure_mode, num_failures)
return self.collect_results(120)
@@ -252,6 +252,6 @@ class StreamsBrokerBounceTest(Test):
time.sleep(120)
# Fail brokers
- self.fail_many_brokers(failure_mode, num_failures);
+ self.fail_many_brokers(failure_mode, num_failures)
return self.collect_results(120)