You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/20 19:25:29 UTC

kafka git commit: KAFKA-6170; KIP-220 Part 1: Add AdminClient to Streams

Repository: kafka
Updated Branches:
  refs/heads/trunk 83b8cf96f -> e3c32391f


KAFKA-6170; KIP-220 Part 1: Add AdminClient to Streams

1. Add The AdminClient into Kafka Streams, which is shared among all the threads.
2. Add ADMIN_PREFIX to StreamsConfig.
3. Also made a few tweaks on the metrics of the AdminClient, which is slightly different from the StreamsKafkaClient (note these changes will not be reflected in this PR but only take place when we eventually replace StreamsKafkaClient):
3.1. "clientId" tag will be set as "clientId-admin": in StreamsKafkaClient it is whatever user sets, and hence could even be null.
3.2. "groupPrefix" will be set as "admin-client": in StreamsKafkaClient it will be "kafka-client".

So the metrics from `StreamsKafkaClient` to `AdminClient` would be changed from

`kafka.admin.client:type=kafka-client-metrics,client-id=`

to

`kafka.admin.client:type=admin-client-metrics,client-id=myApp-UUID-admin`

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Matthias J. Sax <ma...@confluent.io>, Ted Yu <yu...@gmail.com>

Closes #4211 from guozhangwang/K6170-admin-client


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e3c32391
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e3c32391
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e3c32391

Branch: refs/heads/trunk
Commit: e3c32391f95d82a217295c7e4c1519981124bc3f
Parents: 83b8cf9
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Nov 20 11:25:22 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 20 11:25:22 2017 -0800

----------------------------------------------------------------------
 .../clients/admin/MockKafkaAdminClientEnv.java  |  18 ++-
 .../kafka/streams/KafkaClientSupplier.java      |   9 ++
 .../org/apache/kafka/streams/KafkaStreams.java  | 133 ++++++++++---------
 .../org/apache/kafka/streams/StreamsConfig.java |  53 ++++++--
 .../internals/DefaultKafkaClientSupplier.java   |   6 +
 .../processor/internals/StreamThread.java       |   7 +
 .../processor/internals/StreamThreadTest.java   |  14 +-
 .../apache/kafka/test/MockClientSupplier.java   |  25 +++-
 8 files changed, 172 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
index 9190da2..cca35ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
@@ -37,11 +37,9 @@ import java.util.Map;
  */
 public class MockKafkaAdminClientEnv implements AutoCloseable {
     private final Time time;
-    private final AdminClientConfig adminClientConfig;
-    private final Metadata metadata;
-    private final MockClient mockClient;
-    private final KafkaAdminClient client;
     private final Cluster cluster;
+    private final MockClient mockClient;
+    private final KafkaAdminClient adminClient;
 
     public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
         this(Time.SYSTEM, cluster, vals);
@@ -53,12 +51,12 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
 
     public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map<String, Object> config) {
         this.time = time;
-        this.adminClientConfig = new AdminClientConfig(config);
         this.cluster = cluster;
-        this.metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
+        AdminClientConfig adminClientConfig = new AdminClientConfig(config);
+        Metadata metadata = new Metadata(adminClientConfig.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 adminClientConfig.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
-        this.mockClient = new MockClient(time, this.metadata);
-        this.client = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata, time);
+        this.mockClient = new MockClient(time, metadata);
+        this.adminClient = KafkaAdminClient.createInternal(adminClientConfig, mockClient, metadata, time);
     }
 
     public Time time() {
@@ -70,7 +68,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
     }
 
     public AdminClient adminClient() {
-        return client;
+        return adminClient;
     }
 
     public MockClient kafkaClient() {
@@ -79,7 +77,7 @@ public class MockKafkaAdminClientEnv implements AutoCloseable {
 
     @Override
     public void close() {
-        this.client.close();
+        this.adminClient.close();
     }
 
     private static Map<String, Object> newStrMap(String... vals) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
index 260d58a..5561bd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaClientSupplier.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.streams.processor.StateStore;
@@ -30,6 +31,14 @@ import java.util.Map;
  */
 public interface KafkaClientSupplier {
     /**
+     * Create an {@link AdminClient} which is used for internal topic management.
+     *
+     * @param config Supplied by the {@link StreamsConfig} given to the {@link KafkaStreams}
+     * @return an instance of {@link AdminClient}
+     */
+    AdminClient getAdminClient(final Map<String, Object> config);
+
+    /**
      * Create a {@link Producer} which is used to write records to sink topics.
      *
      * @param config {@link StreamsConfig#getProducerConfigs(String) producer config} which is supplied by the

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c551d01..9e67f54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -133,8 +134,8 @@ public class KafkaStreams {
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
     private final Logger log;
-    private final String logPrefix;
     private final UUID processId;
+    private final String clientId;
     private final Metrics metrics;
     private final StreamsConfig config;
     private final StreamThread[] threads;
@@ -142,6 +143,7 @@ public class KafkaStreams {
     private final StreamsMetadataState streamsMetadataState;
     private final ScheduledExecutorService stateDirCleaner;
     private final QueryableStoreProvider queryableStoreProvider;
+    private final AdminClient adminClient;
 
     private GlobalStreamThread globalStreamThread;
     private KafkaStreams.StateListener stateListener;
@@ -256,8 +258,7 @@ public class KafkaStreams {
                 // will be refused but we do not throw exception here, to allow idempotent close calls
                 return false;
             } else if (!state.isValidTransition(newState)) {
-                log.error("Unexpected state transition from {} to {}", oldState, newState);
-                throw new IllegalStateException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
+                throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState);
             } else {
                 log.info("State transition from {} to {}", oldState, newState);
             }
@@ -276,8 +277,7 @@ public class KafkaStreams {
     private boolean setRunningFromCreated() {
         synchronized (stateLock) {
             if (state != State.CREATED) {
-                log.error("{} Unexpected state transition from {} to {}", logPrefix, state, State.RUNNING);
-                throw new IllegalStateException(logPrefix + " Unexpected state transition from " + state + " to " + State.RUNNING);
+                throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + state + " to " + State.RUNNING);
             }
             state = State.RUNNING;
             stateLock.notifyAll();
@@ -465,6 +465,57 @@ public class KafkaStreams {
         }
     }
 
+    final class DelegatingStateRestoreListener implements StateRestoreListener {
+        private void throwOnFatalException(final Exception fatalUserException,
+                                           final TopicPartition topicPartition,
+                                           final String storeName) {
+            throw new StreamsException(
+                    String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+                            storeName,
+                            topicPartition),
+                    fatalUserException);
+        }
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            if (globalStateRestoreListener != null) {
+                try {
+                    globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+                } catch (final Exception fatalUserException) {
+                    throwOnFatalException(fatalUserException, topicPartition, storeName);
+                }
+            }
+        }
+
+        @Override
+        public void onBatchRestored(final TopicPartition topicPartition,
+                                    final String storeName,
+                                    final long batchEndOffset,
+                                    final long numRestored) {
+            if (globalStateRestoreListener != null) {
+                try {
+                    globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+                } catch (final Exception fatalUserException) {
+                    throwOnFatalException(fatalUserException, topicPartition, storeName);
+                }
+            }
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
+            if (globalStateRestoreListener != null) {
+                try {
+                    globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+                } catch (final Exception fatalUserException) {
+                    throwOnFatalException(fatalUserException, topicPartition, storeName);
+                }
+            }
+        }
+    }
+
     /**
      * @deprecated use {@link #KafkaStreams(Topology, Properties)} instead
      */
@@ -538,18 +589,17 @@ public class KafkaStreams {
         this.config = config;
 
         // The application ID is a required config and hence should always have value
-        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         processId = UUID.randomUUID();
-
-        String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+        final String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+        final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         if (clientId.length() <= 0) {
-            clientId = applicationId + "-" + processId;
+            this.clientId = applicationId + "-" + processId;
+        } else {
+            this.clientId = clientId;
         }
 
-        this.logPrefix = String.format("stream-client [%s]", clientId);
-        final LogContext logContext = new LogContext(logPrefix);
+        final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
-        final String cleanupThreadName = clientId + "-CleanupThread";
 
         internalTopologyBuilder.setApplicationId(applicationId);
         // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
@@ -561,52 +611,7 @@ public class KafkaStreams {
             log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
 
-        final StateRestoreListener delegatingStateRestoreListener = new StateRestoreListener() {
-            @Override
-            public void onRestoreStart(final TopicPartition topicPartition, final String storeName, final long startingOffset, final long endingOffset) {
-                if (globalStateRestoreListener != null) {
-                    try {
-                        globalStateRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
-                    } catch (final Exception fatalUserException) {
-                        throw new StreamsException(
-                            String.format("Fatal user code error in store restore listener for store %s, partition %s.",
-                                storeName,
-                                topicPartition),
-                            fatalUserException);
-                    }
-                }
-            }
-
-            @Override
-            public void onBatchRestored(final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored) {
-                if (globalStateRestoreListener != null) {
-                    try {
-                        globalStateRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
-                    } catch (final Exception fatalUserException) {
-                        throw new StreamsException(
-                            String.format("Fatal user code error in store restore listener for store %s, partition %s.",
-                                storeName,
-                                topicPartition),
-                            fatalUserException);
-                    }
-                }
-            }
-
-            @Override
-            public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
-                if (globalStateRestoreListener != null) {
-                    try {
-                        globalStateRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-                    } catch (final Exception fatalUserException) {
-                        throw new StreamsException(
-                            String.format("Fatal user code error in store restore listener for store %s, partition %s.",
-                                storeName,
-                                topicPartition),
-                            fatalUserException);
-                    }
-                }
-            }
-        };
+        final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
 
         threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         try {
@@ -622,8 +627,7 @@ public class KafkaStreams {
 
         final MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
             .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
-            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
-                TimeUnit.MILLISECONDS);
+            .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS);
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
             MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
@@ -644,12 +648,16 @@ public class KafkaStreams {
             globalThreadState = globalStreamThread.state();
         }
 
+        // use client id instead of thread client id since this admin client may be shared among threads
+        this.adminClient = clientSupplier.getAdminClient(config.getAdminConfigs(clientId));
+
         final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         for (int i = 0; i < threads.length; i++) {
             threads[i] = StreamThread.create(internalTopologyBuilder,
                                              config,
                                              clientSupplier,
+                                             adminClient,
                                              processId,
                                              clientId,
                                              metrics,
@@ -672,10 +680,11 @@ public class KafkaStreams {
 
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(internalTopologyBuilder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
+
         stateDirCleaner = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
             @Override
             public Thread newThread(final Runnable r) {
-                final Thread thread = new Thread(r, cleanupThreadName);
+                final Thread thread = new Thread(r, clientId + "-CleanupThread");
                 thread.setDaemon(true);
                 return thread;
             }
@@ -829,6 +838,8 @@ public class KafkaStreams {
                         globalStreamThread = null;
                     }
 
+                    adminClient.close();
+
                     metrics.close();
                     setState(State.NOT_RUNNING);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index a1a0d10..941437c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -108,29 +109,34 @@ public class StreamsConfig extends AbstractConfig {
     private final static long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
 
     /**
-     * Prefix used to isolate {@link KafkaConsumer consumer} configs from {@link KafkaProducer producer} configs.
-     * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer
-     * properties}.
-     */
-    public static final String CONSUMER_PREFIX = "consumer.";
-
-
-    /**
      * Prefix used to provide default topic configs to be applied when creating internal topics.
      * These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
      * It is recommended to use {@link #topicPrefix(String)}.
      */
     public static final String TOPIC_PREFIX = "topic.";
 
+    /**
+     * Prefix used to isolate {@link KafkaConsumer consumer} configs from other client configs.
+     * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer
+     * properties}.
+     */
+    public static final String CONSUMER_PREFIX = "consumer.";
 
     /**
-     * Prefix used to isolate {@link KafkaProducer producer} configs from {@link KafkaConsumer consumer} configs.
+     * Prefix used to isolate {@link KafkaProducer producer} configs from other client configs.
      * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link ProducerConfig producer
      * properties}.
      */
     public static final String PRODUCER_PREFIX = "producer.";
 
     /**
+     * Prefix used to isolate {@link org.apache.kafka.clients.admin.AdminClient admin} configs from other client configs.
+     * It is recommended to use {@link #adminClientPrefix(String)} to add this prefix to {@link ProducerConfig producer
+     * properties}.
+     */
+    public static final String ADMIN_CLIENT_PREFIX = "admin.";
+
+    /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
      */
     public static final String AT_LEAST_ONCE = "at_least_once";
@@ -581,7 +587,7 @@ public class StreamsConfig extends AbstractConfig {
 
     /**
      * Prefix a property with {@link #CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig consumer configs}
-     * from {@link ProducerConfig producer configs}.
+     * from other client configs.
      *
      * @param consumerProp the consumer property to be masked
      * @return {@link #CONSUMER_PREFIX} + {@code consumerProp}
@@ -592,7 +598,7 @@ public class StreamsConfig extends AbstractConfig {
 
     /**
      * Prefix a property with {@link #PRODUCER_PREFIX}. This is used to isolate {@link ProducerConfig producer configs}
-     * from {@link ConsumerConfig consumer configs}.
+     * from other client configs.
      *
      * @param producerProp the producer property to be masked
      * @return PRODUCER_PREFIX + {@code producerProp}
@@ -602,6 +608,17 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Prefix a property with {@link #ADMIN_CLIENT_PREFIX}. This is used to isolate {@link AdminClientConfig admin configs}
+     * from other client configs.
+     *
+     * @param adminClientProp the admin client property to be masked
+     * @return ADMIN_CLIENT_PREFIX + {@code adminClientProp}
+     */
+    public static String adminClientPrefix(final String adminClientProp) {
+        return ADMIN_CLIENT_PREFIX + adminClientProp;
+    }
+
+    /**
      * Prefix a property with {@link #TOPIC_PREFIX}
      * used to provide default topic configs to be applied when creating internal topics.
      *
@@ -776,6 +793,20 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
+    /**
+     * Get the configs for the {@link org.apache.kafka.clients.admin.AdminClient admin client}.
+     * @param clientId clientId
+     * @return Map of the admin client configuration.
+     */
+    public Map<String, Object> getAdminConfigs(final String clientId) {
+        final Map<String, Object> props = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
+
+        // add client id with stream client id prefix
+        props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin");
+
+        return props;
+    }
+
     private Map<String, Object> getClientPropsWithPrefix(final String prefix,
                                                          final Set<String> configNames) {
         final Map<String, Object> props = clientProps(configNames, originals());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
index 92744ce..f3038f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import java.util.Map;
 
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -28,6 +29,11 @@ import org.apache.kafka.streams.KafkaClientSupplier;
 
 public class DefaultKafkaClientSupplier implements KafkaClientSupplier {
     @Override
+    public AdminClient getAdminClient(final Map<String, Object> config) {
+        return AdminClient.create(config);
+    }
+
+    @Override
     public Producer<byte[], byte[]> getProducer(Map<String, Object> config) {
         return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2ad3177..1514e26 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -574,6 +575,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
     private ThreadMetadataProvider metadataProvider;
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
 
+    private final AdminClient adminClient;
+
     // package-private for testing
     final ConsumerRebalanceListener rebalanceListener;
     final Consumer<byte[], byte[]> restoreConsumer;
@@ -598,6 +601,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                         final StreamsMetricsThreadImpl streamsMetrics,
                         final KafkaClientSupplier clientSupplier,
                         final Consumer<byte[], byte[]> restoreConsumer,
+                        final AdminClient adminClient,
                         final StateDirectory stateDirectory) {
         super(threadClientId);
         this.builder = builder;
@@ -612,6 +616,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
         this.logPrefix = String.format("stream-thread [%s] ", threadClientId);
         this.streamsMetrics = streamsMetrics;
         this.restoreConsumer = restoreConsumer;
+        this.adminClient = adminClient;
         this.stateDirectory = stateDirectory;
         this.config = config;
         this.stateLock = new Object();
@@ -638,6 +643,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
     public static StreamThread create(final InternalTopologyBuilder builder,
                                       final StreamsConfig config,
                                       final KafkaClientSupplier clientSupplier,
+                                      final AdminClient adminClient,
                                       final UUID processId,
                                       final String clientId,
                                       final Metrics metrics,
@@ -718,6 +724,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
                                 streamsMetrics,
                                 clientSupplier,
                                 restoreConsumer,
+                                adminClient,
                                 stateDirectory);
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 03860ea..a3d7523 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -81,7 +81,7 @@ public class StreamThreadTest {
     private final String applicationId = "stream-thread-test";
     private final MockTime mockTime = new MockTime();
     private final Metrics metrics = new Metrics();
-    private MockClientSupplier clientSupplier = new MockClientSupplier();
+    private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private UUID processId = UUID.randomUUID();
     private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder());
     private InternalTopologyBuilder internalTopologyBuilder;
@@ -350,11 +350,13 @@ public class StreamThreadTest {
 
     private StreamThread createStreamThread(final String clientId, final StreamsConfig config, final boolean eosEnabled) {
         if (eosEnabled) {
-            clientSupplier = new MockClientSupplier(applicationId);
+            clientSupplier.setApplicationIdForProducer(applicationId);
         }
+
         return StreamThread.create(internalTopologyBuilder,
                                    config,
                                    clientSupplier,
+                                   clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                                    processId,
                                    clientId,
                                    metrics,
@@ -538,6 +540,7 @@ public class StreamThreadTest {
                                                      streamsMetrics,
                                                      clientSupplier,
                                                      consumer,
+                                                     clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                                                      stateDirectory);
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -570,6 +573,7 @@ public class StreamThreadTest {
                                                      streamsMetrics,
                                                      clientSupplier,
                                                      consumer,
+                                                     clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                                                      stateDirectory);
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -603,6 +607,7 @@ public class StreamThreadTest {
                                                      streamsMetrics,
                                                      clientSupplier,
                                                      consumer,
+                                                     clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                                                      stateDirectory);
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval + 1);
@@ -719,6 +724,7 @@ public class StreamThreadTest {
                                                      streamsMetrics,
                                                      clientSupplier,
                                                      consumer,
+                                                     clientSupplier.getAdminClient(config.getAdminConfigs(clientId)),
                                                      stateDirectory);
         thread.setState(StreamThread.State.RUNNING);
         thread.shutdown();
@@ -818,7 +824,7 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
 
-        final MockConsumer consumer = clientSupplier.consumer;
+        final MockConsumer<byte[], byte[]> consumer = clientSupplier.consumer;
         consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null)));
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -833,8 +839,6 @@ public class StreamThreadTest {
         assertThat(thread.tasks().size(), equalTo(1));
         final MockProducer producer = clientSupplier.producers.get(0);
 
-
-
         // change consumer subscription from "pattern" to "manual" to be able to call .addRecords()
         consumer.updateBeginningOffsets(Collections.singletonMap(task0Assignment.iterator().next(), 0L));
         consumer.unsubscribe();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e3c32391/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index da5ab3b..ae83c60 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -16,13 +16,17 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 
 import java.util.LinkedList;
@@ -34,20 +38,29 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 
 public class MockClientSupplier implements KafkaClientSupplier {
-    private final String applicationId;
     private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
 
+    private Cluster cluster;
+
+    private String applicationId;
+
+    public final List<MockProducer> producers = new LinkedList<>();
+
     public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST);
 
-    public final List<MockProducer> producers = new LinkedList<>();
+    public void setApplicationIdForProducer(final String applicationId) {
+        this.applicationId = applicationId;
+    }
 
-    public MockClientSupplier() {
-        this(null);
+    public void setClusterForAdminClient(final Cluster cluster) {
+        this.cluster = cluster;
     }
 
-    public MockClientSupplier(final String applicationId) {
-        this.applicationId = applicationId;
+    @Override
+    public AdminClient getAdminClient(final Map<String, Object> config) {
+        MockKafkaAdminClientEnv clientEnv = new MockKafkaAdminClientEnv(Time.SYSTEM, cluster, config);
+        return clientEnv.adminClient();
     }
 
     @Override