You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/28 17:37:34 UTC
[3/3] kafka git commit: KAFKA-6170;
KIP-220 Part 2: Break dependency of Assignor on StreamThread
KAFKA-6170; KIP-220 Part 2: Break dependency of Assignor on StreamThread
This refactoring is discussed in https://github.com/apache/kafka/pull/3624#discussion_r132614639. More specifically:
1. Moved the access of `StreamThread` in `StreamPartitionAssignor` to `TaskManager`, removed any fields stored in `StreamThread` such as `processId` and `clientId` that are only to be used in `StreamPartitionAssignor`, and pass them to `TaskManager` if necessary.
2. Moved any in-memory states, `metadataWithInternalTopics`, `partitionsByHostState`, `standbyTasks`, `activeTasks` to `TaskManager` so that `StreamPartitionAssignor` becomes a stateless thin layer that access TaskManager directly.
3. Remove the reference of `StreamPartitionAssignor` in `StreamThread`, instead consolidate all related functionalities such as `cachedTasksIds ` in `TaskManager` which could be retrieved by the `StreamThread` and the `StreamPartitionAssignor` directly.
4. Finally, removed the two interfaces used for `StreamThread` and `StreamPartitionAssignor`.
5. Some minor fixes on logPrefixes, etc.
Future work: when replacing the StreamsKafkaClient, we would let `StreamPartitionAssignor` to retrieve it from `TaskManager` directly, and also its closing call do not need to be called (`KafkaStreams` will be responsible for closing it).
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
Closes #4224 from guozhangwang/K6170-refactor-assignor
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5df1eee7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5df1eee7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5df1eee7
Branch: refs/heads/trunk
Commit: 5df1eee7d689e18ac2f7b74410e7a30159d3afdc
Parents: 8f6a372
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 28 09:37:27 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 28 09:37:27 2017 -0800
----------------------------------------------------------------------
.../kafka/streams/KafkaClientSupplier.java | 3 +-
.../org/apache/kafka/streams/KafkaStreams.java | 88 +--
.../org/apache/kafka/streams/StreamsConfig.java | 9 +-
.../streams/processor/TopologyBuilder.java | 7 +-
.../internals/DefaultKafkaClientSupplier.java | 1 +
.../processor/internals/GlobalStreamThread.java | 3 +-
.../internals/InternalTopicManager.java | 4 -
.../internals/InternalTopologyBuilder.java | 45 +-
.../internals/StreamPartitionAssignor.java | 213 ++---
.../processor/internals/StreamThread.java | 402 ++++------
.../processor/internals/StreamsKafkaClient.java | 64 +-
.../processor/internals/TaskManager.java | 140 +++-
.../processor/internals/ThreadDataProvider.java | 36 -
.../internals/ThreadMetadataProvider.java | 36 -
.../apache/kafka/streams/StreamsConfigTest.java | 20 +-
.../QueryableStateIntegrationTest.java | 2 +
.../streams/processor/TopologyBuilderTest.java | 6 +-
.../internals/GlobalStreamThreadTest.java | 2 +
.../internals/InternalTopicManagerTest.java | 2 +-
.../internals/InternalTopologyBuilderTest.java | 5 +-
.../internals/StreamPartitionAssignorTest.java | 295 +++----
.../processor/internals/StreamThreadTest.java | 769 +++++--------------
.../internals/StreamsKafkaClientTest.java | 43 +-
.../processor/internals/TaskManagerTest.java | 200 ++++-
.../kafka/test/MockInternalTopicManager.java | 2 +-
25 files changed, 912 insertions(+), 1485 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 5561bd1..2ea5218 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.StreamThread;
import java.util.Map;
@@ -50,7 +49,7 @@ public interface KafkaClientSupplier {
/**
* Create a {@link Consumer} which is used to read records of source topics.
*
- * @param config {@link StreamsConfig#getConsumerConfigs(StreamThread, String, String) consumer config} which is
+ * @param config {@link StreamsConfig#getConsumerConfigs(String, String) consumer config} which is
* supplied by the {@link StreamsConfig} given to the {@link KafkaStreams} instance
* @return an instance of Kafka consumer
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 9e67f54..c7dfe71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -50,7 +50,6 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.HostInfo;
@@ -80,8 +79,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
-import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
-import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
/**
* A Kafka client that allows for performing continuous computation on input coming from one or more input topics and
@@ -133,6 +130,7 @@ public class KafkaStreams {
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
+ private final Time time;
private final Logger log;
private final UUID processId;
private final String clientId;
@@ -214,7 +212,7 @@ public class KafkaStreams {
private volatile State state = State.CREATED;
private boolean waitOnState(final State targetState, final long waitMs) {
- long begin = System.currentTimeMillis();
+ long begin = time.milliseconds();
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != State.NOT_RUNNING) {
@@ -235,7 +233,7 @@ public class KafkaStreams {
log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
return false;
}
- elapsedMs = System.currentTimeMillis() - begin;
+ elapsedMs = time.milliseconds() - begin;
}
return true;
}
@@ -587,62 +585,66 @@ public class KafkaStreams {
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this.config = config;
+ time = Time.SYSTEM;
// The application ID is a required config and hence should always have value
processId = UUID.randomUUID();
- final String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+ final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- if (clientId.length() <= 0) {
- this.clientId = applicationId + "-" + processId;
+ if (userClientId.length() <= 0) {
+ clientId = applicationId + "-" + processId;
} else {
- this.clientId = clientId;
+ clientId = userClientId;
}
final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
- internalTopologyBuilder.setApplicationId(applicationId);
- // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
- internalTopologyBuilder.build(null);
-
- long cacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
- if (cacheSize < 0) {
- cacheSize = 0;
- log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
- }
-
- final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
-
- threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
try {
- stateDirectory = new StateDirectory(
- config,
- Time.SYSTEM);
+ stateDirectory = new StateDirectory(config, time);
} catch (final ProcessorStateException fatal) {
throw new StreamsException(fatal);
}
- streamsMetadataState = new StreamsMetadataState(
- internalTopologyBuilder,
- parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
+ MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
- metrics = new Metrics(metricConfig, reporters, Time.SYSTEM);
+ metrics = new Metrics(metricConfig, reporters, time);
- GlobalStreamThread.State globalThreadState = null;
+ internalTopologyBuilder.setApplicationId(applicationId);
+
+ // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
+ internalTopologyBuilder.build();
+
+ streamsMetadataState = new StreamsMetadataState(
+ internalTopologyBuilder,
+ parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
+
+ // create the stream thread, global update thread, and cleanup thread
+ threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+
+ long totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+ if (totalCacheSize < 0) {
+ totalCacheSize = 0;
+ log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+ }
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
+
+ final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
+ GlobalStreamThread.State globalThreadState = null;
if (globalTaskTopology != null) {
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
stateDirectory,
+ cacheSizePerThread,
metrics,
- Time.SYSTEM,
+ time,
globalThreadId,
delegatingStateRestoreListener);
globalThreadState = globalStreamThread.state();
@@ -661,9 +663,9 @@ public class KafkaStreams {
processId,
clientId,
metrics,
- Time.SYSTEM,
+ time,
streamsMetadataState,
- cacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1)),
+ cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener);
threadState.put(threads[i].getId(), threads[i].state());
@@ -706,22 +708,6 @@ public class KafkaStreams {
}
/**
- * Check if the used brokers have version 0.10.1.x or higher.
- * <p>
- * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be checked and the client will hang and retry
- * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
- *
- * @throws StreamsException if brokers have version 0.10.0.x
- */
- private void checkBrokerVersionCompatibility() throws StreamsException {
- final StreamsKafkaClient client = StreamsKafkaClient.create(config);
-
- client.checkBrokerCompatibility(EXACTLY_ONCE.equals(config.getString(PROCESSING_GUARANTEE_CONFIG)));
-
- client.close();
- }
-
- /**
* Start the {@code KafkaStreams} instance by starting all its threads.
* This function is expected to be called only once during the life cycle of the client.
* <p>
@@ -745,8 +731,6 @@ public class KafkaStreams {
// first set state to RUNNING before kicking off the threads,
// making sure the state will always transit to RUNNING before REBALANCING
if (setRunningFromCreated()) {
- checkBrokerVersionCompatibility();
-
if (globalStreamThread != null) {
globalStreamThread.start();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 941437c..e1f2b09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -582,7 +581,7 @@ public class StreamsConfig extends AbstractConfig {
}
public static class InternalConfig {
- public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
+ public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
}
/**
@@ -722,22 +721,20 @@ public class StreamsConfig extends AbstractConfig {
* except in the case of {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} where we always use the non-prefixed
* version as we only support reading/writing from/to the same Kafka Cluster.
*
- * @param streamThread the {@link StreamThread} creating a consumer
* @param groupId consumer groupId
* @param clientId clientId
* @return Map of the consumer configuration.
*/
- public Map<String, Object> getConsumerConfigs(final StreamThread streamThread,
- final String groupId,
+ public Map<String, Object> getConsumerConfigs(final String groupId,
final String clientId) {
final Map<String, Object> consumerProps = getCommonConsumerConfigs();
// add client id with stream client id prefix, and group id
+ consumerProps.put(APPLICATION_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
// add configs required for stream partition assignor
- consumerProps.put(InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 66dfa27..6f34e25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -932,7 +933,9 @@ public class TopologyBuilder {
* for the high-level DSL parsing functionalities.
*/
public SubscriptionUpdates subscriptionUpdates() {
- return internalTopologyBuilder.subscriptionUpdates();
+ SubscriptionUpdates clonedSubscriptionUpdates = new SubscriptionUpdates();
+ clonedSubscriptionUpdates.updateTopics(internalTopologyBuilder.subscriptionUpdates().getUpdates());
+ return clonedSubscriptionUpdates;
}
/**
@@ -949,7 +952,7 @@ public class TopologyBuilder {
*/
public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
final String threadId) {
- internalTopologyBuilder.updateSubscriptions(subscriptionUpdates, threadId);
+ internalTopologyBuilder.updateSubscribedTopics(new HashSet<>(subscriptionUpdates.getUpdates()), "stream-thread [" + threadId + "] ");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index f3038f3..6f01e2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.KafkaClientSupplier;
public class DefaultKafkaClientSupplier implements KafkaClientSupplier {
@Override
public AdminClient getAdminClient(final Map<String, Object> config) {
+ // create a new client upon each call; but expect this call to be only triggered once so this should be fine
return AdminClient.create(config);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 24cec25..9d202d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -176,6 +176,7 @@ public class GlobalStreamThread extends Thread {
final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
+ final long cacheSizeBytes,
final Metrics metrics,
final Time time,
final String threadClientId,
@@ -186,8 +187,6 @@ public class GlobalStreamThread extends Thread {
this.topology = topology;
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
- long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
- (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
this.logContext = new LogContext(logPrefix);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index ae2b375..f8d4eec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -108,10 +108,6 @@ public class InternalTopicManager {
throw new StreamsException("Could not get number of partitions.");
}
- public void close() {
- streamsKafkaClient.close();
- }
-
/**
* Check the existing topics to have correct number of partitions; and return the non existing topics to be created
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index f2cbf51..881ecd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -842,6 +841,10 @@ public class InternalTopologyBuilder {
return nodeGroups;
}
+ public synchronized ProcessorTopology build() {
+ return build((Integer) null);
+ }
+
public synchronized ProcessorTopology build(final Integer topicGroupId) {
final Set<String> nodeGroup;
if (topicGroupId != null) {
@@ -1246,9 +1249,9 @@ public class InternalTopologyBuilder {
}
public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates,
- final String threadId) {
- log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)",
- threadId, subscriptionUpdates);
+ final String logPrefix) {
+ log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)",
+ logPrefix, subscriptionUpdates);
this.subscriptionUpdates = subscriptionUpdates;
setRegexMatchedTopicsToSourceNodes();
setRegexMatchedTopicToStateStore();
@@ -1811,4 +1814,38 @@ public class InternalTopologyBuilder {
return sb.toString();
}
+ /**
+ * Used to capture subscribed topic via Patterns discovered during the
+ * partition assignment process.
+ */
+ public static class SubscriptionUpdates {
+
+ private final Set<String> updatedTopicSubscriptions = new HashSet<>();
+
+ private void updateTopics(final Collection<String> topicNames) {
+ updatedTopicSubscriptions.clear();
+ updatedTopicSubscriptions.addAll(topicNames);
+ }
+
+ public Collection<String> getUpdates() {
+ return Collections.unmodifiableSet(updatedTopicSubscriptions);
+ }
+
+ boolean hasUpdates() {
+ return !updatedTopicSubscriptions.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("SubscriptionUpdates{updatedTopicSubscriptions=%s}", updatedTopicSubscriptions);
+ }
+ }
+
+ public void updateSubscribedTopics(final Set<String> topics, final String logPrefix) {
+ final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+ log.debug("{}found {} topics possibly matching regex", topics, logPrefix);
+ // update the topic groups with the returned subscription set for regex pattern subscriptions
+ subscriptionUpdates.updateTopics(topics);
+ updateSubscriptions(subscriptionUpdates, logPrefix);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 9e505a1..ec42a86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
+import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
@@ -52,9 +53,8 @@ import java.util.UUID;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
-import static org.apache.kafka.streams.processor.internals.InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
-public class StreamPartitionAssignor implements PartitionAssignor, Configurable, ThreadMetadataProvider {
+public class StreamPartitionAssignor implements PartitionAssignor, Configurable {
private Time time = Time.SYSTEM;
private final static int UNKNOWN = -1;
@@ -164,21 +164,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
if (result != 0) {
return result;
} else {
- return p1.partition() < p2.partition() ? UNKNOWN : (p1.partition() > p2.partition() ? 1 : 0);
+ return Integer.compare(p1.partition(), p2.partition());
}
}
};
- private ThreadDataProvider threadDataProvider;
-
private String userEndPoint;
private int numStandbyReplicas;
- private Cluster metadataWithInternalTopics;
- private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
-
- private Map<TaskId, Set<TopicPartition>> standbyTasks;
- private Map<TaskId, Set<TopicPartition>> activeTasks;
+ private TaskManager taskManager;
+ private PartitionGrouper partitionGrouper;
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsValidator copartitionedTopicsValidator;
@@ -199,30 +194,33 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
*/
@Override
public void configure(Map<String, ?> configs) {
- numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ final StreamsConfig streamsConfig = new StreamsConfig(configs);
// Setting the logger with the passed in client thread name
- logPrefix = String.format("stream-thread [%s] ", configs.get(CommonClientConfigs.CLIENT_ID_CONFIG));
+ logPrefix = String.format("stream-thread [%s] ", streamsConfig.getString(CommonClientConfigs.CLIENT_ID_CONFIG));
final LogContext logContext = new LogContext(logPrefix);
- this.log = logContext.logger(getClass());
+ log = logContext.logger(getClass());
- Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
+ final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
if (o == null) {
- KafkaException ex = new KafkaException("StreamThread is not specified");
+ KafkaException ex = new KafkaException("TaskManager is not specified");
log.error(ex.getMessage(), ex);
throw ex;
}
- if (!(o instanceof ThreadDataProvider)) {
- KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), ThreadDataProvider.class.getName()));
+ if (!(o instanceof TaskManager)) {
+ KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
log.error(ex.getMessage(), ex);
throw ex;
}
- threadDataProvider = (ThreadDataProvider) o;
- threadDataProvider.setThreadMetadataProvider(this);
+ taskManager = (TaskManager) o;
+
+ numStandbyReplicas = streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
- String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
+ partitionGrouper = streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
+
+ final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
try {
String host = getHost(userEndPoint);
@@ -241,13 +239,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
internalTopicManager = new InternalTopicManager(
- StreamsKafkaClient.create(this.threadDataProvider.config()),
- configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
- configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
- (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
- : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
+ taskManager.streamsKafkaClient,
+ streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG),
+ streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG),
+ time);
- this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(threadDataProvider.name());
+ copartitionedTopicsValidator = new CopartitionedTopicsValidator(logPrefix);
}
@Override
@@ -262,27 +259,16 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// 2. Task ids of previously running tasks
// 3. Task ids of valid local states on the client's state directory.
- final Set<TaskId> previousActiveTasks = threadDataProvider.prevActiveTasks();
- Set<TaskId> standbyTasks = threadDataProvider.cachedTasks();
+ final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
+ final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
standbyTasks.removeAll(previousActiveTasks);
- SubscriptionInfo data = new SubscriptionInfo(threadDataProvider.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
+ final SubscriptionInfo data = new SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
- if (threadDataProvider.builder().sourceTopicPattern() != null &&
- !threadDataProvider.builder().subscriptionUpdates().getUpdates().equals(topics)) {
- updateSubscribedTopics(topics);
- }
+ taskManager.updateSubscriptionsFromMetadata(topics);
return new Subscription(new ArrayList<>(topics), data.encode());
}
- private void updateSubscribedTopics(Set<String> topics) {
- SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
- log.debug("found {} topics possibly matching regex", topics);
- // update the topic groups with the returned subscription set for regex pattern subscriptions
- subscriptionUpdates.updateTopics(topics);
- threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, threadDataProvider.name());
- }
-
/*
* This assigns tasks to consumer clients in the following steps.
*
@@ -333,9 +319,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// parse the topology to determine the repartition source topics,
// making sure they are created with the number of partitions as
// the maximum of the depending sub-topologies source topics' number of partitions
- Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = threadDataProvider.builder().topicGroups();
+ final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
- Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
+ final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
@@ -353,7 +339,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) {
for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
- Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
+ final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) {
// if this topic is one of the sink topics of this topology,
@@ -391,10 +377,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// augment the metadata with the newly computed number of partitions for all the
// repartition source topics
- Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
+ final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
- String topic = entry.getKey();
- Integer numPartitions = entry.getValue().numPartitions;
+ final String topic = entry.getKey();
+ final Integer numPartitions = entry.getValue().numPartitions;
for (int partition = 0; partition < numPartitions; partition++) {
allRepartitionTopicPartitions.put(new TopicPartition(topic, partition),
@@ -405,34 +391,34 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// ensure the co-partitioning topics within the group have the same number of partitions,
// and enforce the number of partitions for those repartition topics to be the same if they
// are co-partitioned as well.
- ensureCopartitioning(threadDataProvider.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
+ ensureCopartitioning(taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
// make sure the repartition source topics exist with the right number of partitions,
// create these topics if necessary
prepareTopic(repartitionTopicMetadata);
- metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
+ final Cluster fullMetadata = metadata.withPartitions(allRepartitionTopicPartitions);
+ taskManager.setClusterMetadata(fullMetadata);
log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
// ---------------- Step One ---------------- //
// get the tasks as partition groups from the partition grouper
- Set<String> allSourceTopics = new HashSet<>();
- Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
+ final Set<String> allSourceTopics = new HashSet<>();
+ final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
allSourceTopics.addAll(entry.getValue().sourceTopics);
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
}
- Map<TaskId, Set<TopicPartition>> partitionsForTask = threadDataProvider.partitionGrouper().partitionGroups(
- sourceTopicsByGroup, metadataWithInternalTopics);
+ final Map<TaskId, Set<TopicPartition>> partitionsForTask = partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
// check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
- Set<TopicPartition> allAssignedPartitions = new HashSet<>();
- Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
+ final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
+ final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
- Set<TopicPartition> partitions = entry.getValue();
+ final Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
if (allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
@@ -440,7 +426,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
allAssignedPartitions.addAll(partitions);
- TaskId id = entry.getKey();
+ final TaskId id = entry.getKey();
Set<TaskId> ids = tasksByTopicGroup.get(id.topicGroupId);
if (ids == null) {
ids = new HashSet<>();
@@ -449,10 +435,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
ids.add(id);
}
for (String topic : allSourceTopics) {
- List<PartitionInfo> partitionInfoList = metadataWithInternalTopics.partitionsForTopic(topic);
+ final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
if (!partitionInfoList.isEmpty()) {
for (PartitionInfo partitionInfo : partitionInfoList) {
- TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
+ final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask);
}
@@ -463,7 +449,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
// add tasks to state change log topic subscribers
- Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
+ final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
@@ -476,7 +462,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
if (numPartitions < task.partition + 1)
numPartitions = task.partition + 1;
}
- InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
+ final InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
topicMetadata.numPartitions = numPartitions;
changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
@@ -493,7 +479,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// ---------------- Step Two ---------------- //
// assign tasks to clients
- Map<UUID, ClientState> states = new HashMap<>();
+ final Map<UUID, ClientState> states = new HashMap<>();
for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
states.put(entry.getKey(), entry.getValue().state);
}
@@ -509,9 +495,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// ---------------- Step Three ---------------- //
// construct the global partition assignment per host map
- partitionsByHostState = new HashMap<>();
+ final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
- HostInfo hostInfo = entry.getValue().hostInfo;
+ final HostInfo hostInfo = entry.getValue().hostInfo;
if (hostInfo != null) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
@@ -524,9 +510,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
partitionsByHostState.put(hostInfo, topicPartitions);
}
}
+ taskManager.setPartitionsByHostState(partitionsByHostState);
// within the client, distribute tasks to its owned consumers
- Map<String, Assignment> assignment = new HashMap<>();
+ final Map<String, Assignment> assignment = new HashMap<>();
for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final Set<String> consumers = entry.getValue().consumers;
final ClientState state = entry.getValue().state;
@@ -541,12 +528,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
int i = 0;
for (String consumer : consumers) {
- Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
- ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
+ final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
+ final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
final int numTaskIds = taskIds.size();
for (int j = i; j < numTaskIds; j += numConsumers) {
- TaskId taskId = taskIds.get(j);
+ final TaskId taskId = taskIds.get(j);
if (j < numActiveTasks) {
for (TopicPartition partition : partitionsForTask.get(taskId)) {
assignedPartitions.add(new AssignedPartition(taskId, partition));
@@ -562,8 +549,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
Collections.sort(assignedPartitions);
- List<TaskId> active = new ArrayList<>();
- List<TopicPartition> activePartitions = new ArrayList<>();
+ final List<TaskId> active = new ArrayList<>();
+ final List<TopicPartition> activePartitions = new ArrayList<>();
for (AssignedPartition partition : assignedPartitions) {
active.add(partition.taskId);
activePartitions.add(partition.partition);
@@ -588,8 +575,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
- this.standbyTasks = info.standbyTasks;
- this.activeTasks = new HashMap<>();
+ Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
// the number of assigned partitions should be the same as number of active tasks, which
// could be duplicated if one task has more than one assigned partitions
@@ -612,35 +598,22 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
assignedPartitions.add(partition);
}
- this.partitionsByHostState = info.partitionsByHost;
-
- final Collection<Set<TopicPartition>> values = partitionsByHostState.values();
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
- for (Set<TopicPartition> value : values) {
+ for (Set<TopicPartition> value : info.partitionsByHost.values()) {
for (TopicPartition topicPartition : value) {
topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
- topicPartition.partition(),
- null,
- new Node[0],
- new Node[0]));
+ topicPartition.partition(),
+ null,
+ new Node[0],
+ new Node[0]));
}
}
- metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
- checkForNewTopicAssignments(assignment);
- }
+ taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
+ taskManager.setPartitionsByHostState(info.partitionsByHost);
+ taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks);
- private void checkForNewTopicAssignments(Assignment assignment) {
- if (threadDataProvider.builder().sourceTopicPattern() != null) {
- final Set<String> assignedTopics = new HashSet<>();
- for (final TopicPartition topicPartition : assignment.partitions()) {
- assignedTopics.add(topicPartition.topic());
- }
- if (!threadDataProvider.builder().subscriptionUpdates().getUpdates().containsAll(assignedTopics)) {
- assignedTopics.addAll(threadDataProvider.builder().subscriptionUpdates().getUpdates());
- updateSubscribedTopics(assignedTopics);
- }
- }
+ taskManager.updateSubscriptionsFromAssignment(partitions);
}
/**
@@ -706,53 +679,24 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
}
- public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
- if (partitionsByHostState == null) {
- return Collections.emptyMap();
- }
- return Collections.unmodifiableMap(partitionsByHostState);
- }
-
- public Cluster clusterMetadata() {
- if (metadataWithInternalTopics == null) {
- return Cluster.empty();
- }
- return metadataWithInternalTopics;
- }
-
- public Map<TaskId, Set<TopicPartition>> activeTasks() {
- if (activeTasks == null) {
- return Collections.emptyMap();
- }
- return Collections.unmodifiableMap(activeTasks);
- }
-
- public Map<TaskId, Set<TopicPartition>> standbyTasks() {
- if (standbyTasks == null) {
- return Collections.emptyMap();
- }
- return Collections.unmodifiableMap(standbyTasks);
- }
-
- void setInternalTopicManager(InternalTopicManager internalTopicManager) {
- this.internalTopicManager = internalTopicManager;
- }
-
/**
* Used to capture subscribed topic via Patterns discovered during the
* partition assignment process.
+ *
+ * // TODO: this is a duplicate of the InternalTopologyBuilder#SubscriptionUpdates
+ * and is maintained only for compatibility of the deprecated TopologyBuilder API
*/
public static class SubscriptionUpdates {
private final Set<String> updatedTopicSubscriptions = new HashSet<>();
- private void updateTopics(Collection<String> topicNames) {
+ public void updateTopics(Collection<String> topicNames) {
updatedTopicSubscriptions.clear();
updatedTopicSubscriptions.addAll(topicNames);
}
public Collection<String> getUpdates() {
- return Collections.unmodifiableSet(new HashSet<>(updatedTopicSubscriptions));
+ return Collections.unmodifiableSet(updatedTopicSubscriptions);
}
public boolean hasUpdates() {
@@ -767,15 +711,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
}
- public void close() {
- internalTopicManager.close();
- }
-
static class CopartitionedTopicsValidator {
private final String logPrefix;
- CopartitionedTopicsValidator(final String threadName) {
- this.logPrefix = String.format("stream-thread [%s]", threadName);
+ CopartitionedTopicsValidator(final String logPrefix) {
+ this.logPrefix = logPrefix;
}
@SuppressWarnings("deprecation")
@@ -826,4 +766,9 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
}
+
+ // following functions are for test only
+ void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+ this.internalTopicManager = internalTopicManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1514e26..14b912e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -42,9 +42,7 @@ import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
-import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
@@ -52,7 +50,6 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
-import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -67,9 +64,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singleton;
-public class StreamThread extends Thread implements ThreadDataProvider {
+public class StreamThread extends Thread {
- private final Logger log;
+ private final static int UNLIMITED_RECORDS = -1;
private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
/**
@@ -204,7 +201,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
if (newState == State.RUNNING) {
updateThreadMetadata(taskManager.activeTasks(), taskManager.standbyTasks());
} else {
- updateThreadMetadata(null, null);
+ updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
}
}
@@ -258,7 +255,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
return;
}
taskManager.createTasks(assignment);
- streamThread.refreshMetadataState();
} catch (final Throwable t) {
log.error("Error caught during partition assignment, " +
"will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
@@ -295,7 +291,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
"will abort the current process and re-throw at the end of rebalance: {}", t.getMessage());
streamThread.setRebalanceException(t);
} finally {
- streamThread.refreshMetadataState();
streamThread.clearStandbyRecords();
log.info("partition revocation took {} ms.\n" +
@@ -340,6 +335,14 @@ public class StreamThread extends Thread implements ThreadDataProvider {
this.log = log;
}
+ public InternalTopologyBuilder builder() {
+ return builder;
+ }
+
+ public StateDirectory stateDirectory() {
+ return stateDirectory;
+ }
+
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
@@ -555,15 +558,12 @@ public class StreamThread extends Thread implements ThreadDataProvider {
private final long pollTimeMs;
private final long commitTimeMs;
private final Object stateLock;
- private final UUID processId;
- private final String clientId;
+ private final Logger log;
private final String logPrefix;
- private final StreamsConfig config;
+ // TODO: adminClient will be passeed to taskManager to be accessed in StreamPartitionAssignor
+ private final AdminClient adminClient;
private final TaskManager taskManager;
- private final StateDirectory stateDirectory;
- private final PartitionGrouper partitionGrouper;
private final StreamsMetricsThreadImpl streamsMetrics;
- private final StreamsMetadataState streamsMetadataState;
private long lastCommitMs;
private long timerStartedMs;
@@ -571,75 +571,16 @@ public class StreamThread extends Thread implements ThreadDataProvider {
private Throwable rebalanceException = null;
private boolean processStandbyRecords = false;
private volatile State state = State.CREATED;
+ private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;
- private ThreadMetadataProvider metadataProvider;
private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
- private final AdminClient adminClient;
-
// package-private for testing
final ConsumerRebalanceListener rebalanceListener;
final Consumer<byte[], byte[]> restoreConsumer;
+ final Consumer<byte[], byte[]> consumer;
+ final InternalTopologyBuilder builder;
- protected final Consumer<byte[], byte[]> consumer;
- protected final InternalTopologyBuilder builder;
-
- public final String applicationId;
-
- private volatile ThreadMetadata threadMetadata;
-
- private final static int UNLIMITED_RECORDS = -1;
-
- public StreamThread(final InternalTopologyBuilder builder,
- final String clientId,
- final String threadClientId,
- final StreamsConfig config,
- final UUID processId,
- final Time time,
- final StreamsMetadataState streamsMetadataState,
- final TaskManager taskManager,
- final StreamsMetricsThreadImpl streamsMetrics,
- final KafkaClientSupplier clientSupplier,
- final Consumer<byte[], byte[]> restoreConsumer,
- final AdminClient adminClient,
- final StateDirectory stateDirectory) {
- super(threadClientId);
- this.builder = builder;
- this.clientId = clientId;
- this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
- this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
- this.processId = processId;
- this.time = time;
- this.streamsMetadataState = streamsMetadataState;
- this.taskManager = taskManager;
- this.logPrefix = String.format("stream-thread [%s] ", threadClientId);
- this.streamsMetrics = streamsMetrics;
- this.restoreConsumer = restoreConsumer;
- this.adminClient = adminClient;
- this.stateDirectory = stateDirectory;
- this.config = config;
- this.stateLock = new Object();
- this.standbyRecords = new HashMap<>();
- this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
- final LogContext logContext = new LogContext(this.logPrefix);
- this.log = logContext.logger(StreamThread.class);
- this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
-
- log.info("Creating consumer client");
- final Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId);
-
- if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
- originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- log.info("Custom offset resets specified updating configs original auto offset reset {}", originalReset);
- consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
- }
- this.consumer = clientSupplier.getConsumer(consumerConfigs);
- taskManager.setConsumer(consumer);
- updateThreadMetadata(null, null);
- }
-
- @SuppressWarnings("ConstantConditions")
public static StreamThread create(final InternalTopologyBuilder builder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
@@ -652,82 +593,123 @@ public class StreamThread extends Thread implements ThreadDataProvider {
final long cacheSizeBytes,
final StateDirectory stateDirectory,
final StateRestoreListener userStateRestoreListener) {
-
final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
- final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics,
- "stream-metrics",
- "thread." + threadClientId,
- Collections.singletonMap("client-id",
- threadClientId));
final String logPrefix = String.format("stream-thread [%s] ", threadClientId);
final LogContext logContext = new LogContext(logPrefix);
final Logger log = logContext.logger(StreamThread.class);
- if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
- log.warn("Negative cache size passed in thread. Reverting to cache size of 0 bytes");
- }
- final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
-
- final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
-
log.info("Creating restore consumer client");
- final Map<String, Object> consumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
- final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
- final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer,
- userStateRestoreListener,
- logContext);
+ final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(threadClientId);
+ final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
+ final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);
Producer<byte[], byte[]> threadProducer = null;
+ final boolean eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
if (!eosEnabled) {
final Map<String, Object> producerConfigs = config.getProducerConfigs(threadClientId);
log.info("Creating shared producer client");
threadProducer = clientSupplier.getProducer(producerConfigs);
}
- final AbstractTaskCreator activeTaskCreator = new TaskCreator(builder,
- config,
- streamsMetrics,
- stateDirectory,
- streamsMetrics.taskCreatedSensor,
- changelogReader,
- cache,
- time,
- clientSupplier,
- threadProducer,
- threadClientId,
- log);
- final AbstractTaskCreator standbyTaskCreator = new StandbyTaskCreator(builder,
- config,
- streamsMetrics,
- stateDirectory,
- streamsMetrics.taskCreatedSensor,
- changelogReader,
- time,
- log);
- final TaskManager taskManager = new TaskManager(changelogReader,
- logPrefix,
- restoreConsumer,
- activeTaskCreator,
- standbyTaskCreator,
- new AssignedStreamsTasks(logContext),
- new AssignedStandbyTasks(logContext));
-
- return new StreamThread(builder,
- clientId,
- threadClientId,
- config,
- processId,
- time,
- streamsMetadataState,
- taskManager,
- streamsMetrics,
- clientSupplier,
- restoreConsumer,
- adminClient,
- stateDirectory);
+ StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(
+ metrics,
+ "stream-metrics",
+ "thread." + threadClientId,
+ Collections.singletonMap("client-id", threadClientId));
+
+ final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
+ final StreamsKafkaClient streamsKafkaClient = StreamsKafkaClient.create(config.originals());
+
+ final AbstractTaskCreator<StreamTask> activeTaskCreator = new TaskCreator(builder,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ streamsMetrics.taskCreatedSensor,
+ changelogReader,
+ cache,
+ time,
+ clientSupplier,
+ threadProducer,
+ threadClientId,
+ log);
+ final AbstractTaskCreator<StandbyTask> standbyTaskCreator = new StandbyTaskCreator(builder,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ streamsMetrics.taskCreatedSensor,
+ changelogReader,
+ time,
+ log);
+ TaskManager taskManager = new TaskManager(changelogReader,
+ processId,
+ logPrefix,
+ restoreConsumer,
+ streamsMetadataState,
+ activeTaskCreator,
+ standbyTaskCreator,
+ streamsKafkaClient,
+ new AssignedStreamsTasks(logContext),
+ new AssignedStandbyTasks(logContext));
+ log.info("Creating consumer client");
+ final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+ final Map<String, Object> consumerConfigs = config.getConsumerConfigs(applicationId, threadClientId);
+ consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+ String originalReset = null;
+ if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
+ originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+ consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ }
+ final Consumer<byte[], byte[]> consumer = clientSupplier.getConsumer(consumerConfigs);
+ taskManager.setConsumer(consumer);
+
+ return new StreamThread(time,
+ config,
+ restoreConsumer,
+ consumer,
+ originalReset,
+ adminClient,
+ taskManager,
+ streamsMetrics,
+ builder,
+ threadClientId,
+ logContext);
+ }
+
+ public StreamThread(final Time time,
+ final StreamsConfig config,
+ final Consumer<byte[], byte[]> restoreConsumer,
+ final Consumer<byte[], byte[]> consumer,
+ final String originalReset,
+ final AdminClient adminClient,
+ final TaskManager taskManager,
+ final StreamsMetricsThreadImpl streamsMetrics,
+ final InternalTopologyBuilder builder,
+ final String threadClientId,
+ final LogContext logContext) {
+ super(threadClientId);
+
+ this.stateLock = new Object();
+ this.standbyRecords = new HashMap<>();
+
+ this.time = time;
+ this.builder = builder;
+ this.streamsMetrics = streamsMetrics;
+ this.logPrefix = logContext.logPrefix();
+ this.log = logContext.logger(StreamThread.class);
+ this.rebalanceListener = new RebalanceListener(time, taskManager, this, this.log);
+ this.taskManager = taskManager;
+ this.restoreConsumer = restoreConsumer;
+ this.consumer = consumer;
+ this.originalReset = originalReset;
+ this.adminClient = adminClient;
+
+ this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
+ this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+
+ updateThreadMetadata(Collections.<TaskId, StreamTask>emptyMap(), Collections.<TaskId, StandbyTask>emptyMap());
}
/**
@@ -1102,107 +1084,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
setState(State.PENDING_SHUTDOWN);
}
- public Map<TaskId, StreamTask> tasks() {
- return taskManager.activeTasks();
- }
-
- /**
- * Returns ids of tasks that were being executed before the rebalance.
- */
- public Set<TaskId> prevActiveTasks() {
- return taskManager.prevActiveTaskIds();
- }
-
- @Override
- public InternalTopologyBuilder builder() {
- return builder;
- }
-
- @Override
- public String name() {
- return getName();
- }
-
- /**
- * Returns ids of tasks whose states are kept on the local storage.
- */
- public Set<TaskId> cachedTasks() {
- // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
- // 1) the client is actively maintaining standby tasks by maintaining their states from the change log.
- // 2) the client has just got some tasks migrated out of itself to other clients while these task states
- // have not been cleaned up yet (this can happen in a rolling bounce upgrade, for example).
-
- final HashSet<TaskId> tasks = new HashSet<>();
-
- final File[] stateDirs = stateDirectory.listTaskDirectories();
- if (stateDirs != null) {
- for (final File dir : stateDirs) {
- try {
- final TaskId id = TaskId.parse(dir.getName());
- // if the checkpoint file exists, the state is valid.
- if (new File(dir, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) {
- tasks.add(id);
- }
- } catch (final TaskIdFormatException e) {
- // there may be some unknown files that sits in the same directory,
- // we should ignore these files instead trying to delete them as well
- }
- }
- }
-
- return tasks;
- }
-
- @Override
- public UUID processId() {
- return processId;
- }
-
- @Override
- public StreamsConfig config() {
- return config;
- }
-
- @Override
- public PartitionGrouper partitionGrouper() {
- return partitionGrouper;
- }
-
- /**
- * Produces a string representation containing useful information about a StreamThread.
- * This is useful in debugging scenarios.
- * @return A string representation of the StreamThread instance.
- */
- @Override
- public String toString() {
- return toString("");
- }
-
- /**
- * Produces a string representation containing useful information about a StreamThread, starting with the given indent.
- * This is useful in debugging scenarios.
- * @return A string representation of the StreamThread instance.
- */
- @SuppressWarnings("ThrowableNotThrown")
- public String toString(final String indent) {
- final StringBuilder sb = new StringBuilder()
- .append(indent).append("StreamsThread appId: ").append(applicationId).append("\n")
- .append(indent).append("\tStreamsThread clientId: ").append(clientId).append("\n")
- .append(indent).append("\tStreamsThread threadId: ").append(getName()).append("\n");
-
- sb.append(taskManager.toString(indent));
- return sb.toString();
- }
-
- String threadClientId() {
- return getName();
- }
-
- public void setThreadMetadataProvider(final ThreadMetadataProvider metadataProvider) {
- this.metadataProvider = metadataProvider;
- taskManager.setThreadMetadataProvider(metadataProvider);
- }
-
private void completeShutdown(final boolean cleanRun) {
// set the state to pending shutdown first as it may be called due to error;
// its state may already be PENDING_SHUTDOWN so it will return false but we
@@ -1236,10 +1117,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
standbyRecords.clear();
}
- private void refreshMetadataState() {
- streamsMetadataState.onChange(metadataProvider.getPartitionsByHostState(), metadataProvider.clusterMetadata());
- }
-
/**
* Return information about the current {@link StreamThread}.
*
@@ -1251,17 +1128,46 @@ public class StreamThread extends Thread implements ThreadDataProvider {
private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks, final Map<TaskId, StandbyTask> standbyTasks) {
final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
- if (activeTasks != null) {
- for (Map.Entry<TaskId, StreamTask> task : activeTasks.entrySet()) {
- activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
- }
+ for (Map.Entry<TaskId, StreamTask> task : activeTasks.entrySet()) {
+ activeTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
}
final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
- if (standbyTasks != null) {
- for (Map.Entry<TaskId, StandbyTask> task : standbyTasks.entrySet()) {
- standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
- }
+ for (Map.Entry<TaskId, StandbyTask> task : standbyTasks.entrySet()) {
+ standbyTasksMetadata.add(new TaskMetadata(task.getKey().toString(), task.getValue().partitions()));
}
+
threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), activeTasksMetadata, standbyTasksMetadata);
}
+
+ public Map<TaskId, StreamTask> tasks() {
+ return taskManager.activeTasks();
+ }
+
+ /**
+ * Produces a string representation containing useful information about a StreamThread.
+ * This is useful in debugging scenarios.
+ * @return A string representation of the StreamThread instance.
+ */
+ @Override
+ public String toString() {
+ return toString("");
+ }
+
+ /**
+ * Produces a string representation containing useful information about a StreamThread, starting with the given indent.
+ * This is useful in debugging scenarios.
+ * @return A string representation of the StreamThread instance.
+ */
+ public String toString(final String indent) {
+ final StringBuilder sb = new StringBuilder()
+ .append(indent).append("\tStreamsThread threadId: ").append(getName()).append("\n");
+
+ sb.append(taskManager.toString(indent));
+ return sb.toString();
+ }
+
+ // this is for testing only
+ TaskManager taskManager() {
+ return taskManager;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 075f445..1e21878 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -37,11 +37,8 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.requests.ApiVersionsRequest;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@@ -64,9 +61,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
-import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
-
public class StreamsKafkaClient {
private static final ConfigDef CONFIG = StreamsConfig.configDef()
@@ -75,8 +69,8 @@ public class StreamsKafkaClient {
public static class Config extends AbstractConfig {
- static Config fromStreamsConfig(StreamsConfig streamsConfig) {
- return new Config(streamsConfig.originals());
+ static Config fromStreamsConfig(Map<String, ?> props) {
+ return new Config(props);
}
Config(Map<?, ?> originals) {
@@ -166,8 +160,8 @@ public class StreamsKafkaClient {
return new LogContext("[StreamsKafkaClient clientId=" + clientId + "] ");
}
- public static StreamsKafkaClient create(final StreamsConfig streamsConfig) {
- return create(Config.fromStreamsConfig(streamsConfig));
+ public static StreamsKafkaClient create(final Map<String, ?> props) {
+ return create(Config.fromStreamsConfig(props));
}
public void close() {
@@ -357,55 +351,7 @@ public class StreamsKafkaClient {
throw new StreamsException("Inconsistent response type for internal topic metadata request. " +
"Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
}
- final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
- return metadataResponse;
- }
-
- /**
- * Check if the used brokers have version 0.10.1.x or higher.
- * <p>
- * Note, for <em>pre</em> 0.10.x brokers the broker version cannot be checked and the client will hang and retry
- * until it {@link StreamsConfig#REQUEST_TIMEOUT_MS_CONFIG times out}.
- *
- * @throws BrokerNotFoundException if connecting failed within {@code request.timeout.ms}
- * @throws TimeoutException if there was no response within {@code request.timeout.ms}
- * @throws StreamsException if brokers have version 0.10.0.x
- * @throws StreamsException for any other fatal error
- */
- public void checkBrokerCompatibility(final boolean eosEnabled) throws StreamsException {
- final ClientRequest clientRequest = kafkaClient.newClientRequest(
- getAnyReadyBrokerId(),
- new ApiVersionsRequest.Builder(),
- Time.SYSTEM.milliseconds(),
- true);
-
- final ClientResponse clientResponse = sendRequestSync(clientRequest);
- if (!clientResponse.hasResponse()) {
- throw new StreamsException("Empty response for client request.");
- }
- if (!(clientResponse.responseBody() instanceof ApiVersionsResponse)) {
- throw new StreamsException("Inconsistent response type for API versions request. " +
- "Expected ApiVersionsResponse but received " + clientResponse.responseBody().getClass().getName());
- }
-
- final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) clientResponse.responseBody();
-
- if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
- throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
- }
-
- if (eosEnabled && !brokerSupportsTransactions(apiVersionsResponse)) {
- throw new StreamsException("Setting " + PROCESSING_GUARANTEE_CONFIG + "=" + EXACTLY_ONCE + " requires broker version 0.11.0.x or higher.");
- }
- }
-
- private boolean brokerSupportsTransactions(final ApiVersionsResponse apiVersionsResponse) {
- return apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) != null
- && apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) != null
- && apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) != null
- && apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) != null
- && apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) != null
- && apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) != null;
+ return (MetadataResponse) clientResponse.responseBody();
}
}