You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/31 05:40:01 UTC

[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

    [ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496133#comment-16496133 ] 

ASF GitHub Bot commented on KAFKA-6054:
---------------------------------------

mjsax closed pull request #4636: KAFKA-6054: Add 'version probing' to Kafka Streams rebalance
URL: https://github.com/apache/kafka/pull/4636
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 18dc891682d..bc549960de4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -145,6 +145,7 @@
      */
     // TODO: currently we cannot get the full topic configurations and hence cannot allow topic configs without the prefix,
     //       this can be lifted once kafka.log.LogConfig is completely deprecated by org.apache.kafka.common.config.TopicConfig
+    @SuppressWarnings("WeakerAccess")
     public static final String TOPIC_PREFIX = "topic.";
 
     /**
@@ -152,6 +153,7 @@
      * It is recommended to use {@link #consumerPrefix(String)} to add this prefix to {@link ConsumerConfig consumer
      * properties}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String CONSUMER_PREFIX = "consumer.";
 
     /**
@@ -161,6 +163,7 @@
      * 2. consumer.[config-name]
      * 3. [config-name]
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
 
     /**
@@ -170,6 +173,7 @@
      * 2. consumer.[config-name]
      * 3. [config-name]
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
 
     /**
@@ -179,6 +183,7 @@
      * 2. consumer.[config-name]
      * 3. [config-name]
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
 
     /**
@@ -186,6 +191,7 @@
      * It is recommended to use {@link #producerPrefix(String)} to add this prefix to {@link ProducerConfig producer
      * properties}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String PRODUCER_PREFIX = "producer.";
 
     /**
@@ -193,202 +199,250 @@
      * It is recommended to use {@link #adminClientPrefix(String)} to add this prefix to {@link ProducerConfig producer
      * properties}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String ADMIN_CLIENT_PREFIX = "admin.";
 
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_0100 = "0.10.0";
 
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_0101 = "0.10.1";
 
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_0102 = "0.10.2";
 
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_0110 = "0.11.0";
 
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_10 = "1.0";
 
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_11 = "1.1";
 
     /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String AT_LEAST_ONCE = "at_least_once";
 
     /**
      * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String EXACTLY_ONCE = "exactly_once";
 
     /** {@code application.id} */
+    @SuppressWarnings("WeakerAccess")
     public static final String APPLICATION_ID_CONFIG = "application.id";
     private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
 
     /**{@code user.endpoint} */
+    @SuppressWarnings("WeakerAccess")
     public static final String APPLICATION_SERVER_CONFIG = "application.server";
     private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of state stores within a single KafkaStreams application";
 
     /** {@code bootstrap.servers} */
+    @SuppressWarnings("WeakerAccess")
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
     /** {@code buffered.records.per.partition} */
+    @SuppressWarnings("WeakerAccess")
     public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
     private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
 
     /** {@code cache.max.bytes.buffering} */
+    @SuppressWarnings("WeakerAccess")
     public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
     private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
 
     /** {@code client.id} */
+    @SuppressWarnings("WeakerAccess")
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
     private static final String CLIENT_ID_DOC = "An ID prefix string used for the client IDs of internal consumer, producer and restore-consumer," +
         " with pattern '<client.id>-StreamThread-<threadSequenceNumber>-<consumer|producer|restore-consumer>'.";
 
     /** {@code commit.interval.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
     private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor." +
         " (Note, if 'processing.guarantee' is set to '" + EXACTLY_ONCE + "', the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," +
         " otherwise the default value is " + DEFAULT_COMMIT_INTERVAL_MS + ".";
 
     /** {@code connections.max.idle.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
 
     /**
      * {@code default.deserialization.exception.handler}
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
     private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
 
     /**
      * {@code default.production.exception.handler}
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
     private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
 
     /**
      * {@code default.windowed.key.serde.inner}
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
 
     /**
      * {@code default.windowed.value.serde.inner}
      */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
 
     /** {@code default key.serde} */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
     private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
             + "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
             + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
 
     /** {@code default value.serde} */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
     private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
             + "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
             + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
 
     /** {@code default.timestamp.extractor} */
+    @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
     private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
 
     /** {@code metadata.max.age.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
 
     /** {@code metrics.num.samples} */
+    @SuppressWarnings("WeakerAccess")
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
 
     /** {@code metrics.record.level} */
+    @SuppressWarnings("WeakerAccess")
     public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
 
     /** {@code metric.reporters} */
+    @SuppressWarnings("WeakerAccess")
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
     /** {@code metrics.sample.window.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
 
     /** {@code num.standby.replicas} */
+    @SuppressWarnings("WeakerAccess")
     public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
     private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
 
     /** {@code num.stream.threads} */
+    @SuppressWarnings("WeakerAccess")
     public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
     private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
 
     /** {@code partition.grouper} */
+    @SuppressWarnings("WeakerAccess")
     public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
     private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>org.apache.kafka.streams.processor.PartitionGrouper</code> interface.";
 
     /** {@code poll.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String POLL_MS_CONFIG = "poll.ms";
     private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
 
     /** {@code processing.guarantee} */
+    @SuppressWarnings("WeakerAccess")
     public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
     private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>" + AT_LEAST_ONCE + "</code> (default) and <code>" + EXACTLY_ONCE + "</code>. " +
         "Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`.";
 
     /** {@code receive.buffer.bytes} */
+    @SuppressWarnings("WeakerAccess")
     public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
 
     /** {@code reconnect.backoff.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
 
     /** {@code reconnect.backoff.max} */
+    @SuppressWarnings("WeakerAccess")
     public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG;
 
     /** {@code replication.factor} */
+    @SuppressWarnings("WeakerAccess")
     public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
     private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application.";
 
     /** {@code request.timeout.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
 
     /** {@code retries} */
+    @SuppressWarnings("WeakerAccess")
     public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
 
     /** {@code retry.backoff.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
 
     /** {@code rocksdb.config.setter} */
+    @SuppressWarnings("WeakerAccess")
     public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
     private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the <code>org.apache.kafka.streams.state.RocksDBConfigSetter</code> interface";
 
     /** {@code security.protocol} */
+    @SuppressWarnings("WeakerAccess")
     public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
 
     /** {@code send.buffer.bytes} */
+    @SuppressWarnings("WeakerAccess")
     public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
 
     /** {@code state.cleanup.delay} */
+    @SuppressWarnings("WeakerAccess")
     public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
     private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed";
 
     /** {@code state.dir} */
+    @SuppressWarnings("WeakerAccess")
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
 
     /** {@code upgrade.from} */
+    @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
-    public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " +
+    private static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " +
         "When upgrading from 1.2 to a newer version it is not required to specify this config." +
         "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version).";
 
     /** {@code windowstore.changelog.additional.retention.ms} */
+    @SuppressWarnings("WeakerAccess")
     public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
     private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
 
@@ -653,6 +707,7 @@
 
     public static class InternalConfig {
         public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
+        public static final String VERSION_PROBING_FLAG = "__version.probing.flag__";
     }
 
     /**
@@ -662,6 +717,7 @@
      * @param consumerProp the consumer property to be masked
      * @return {@link #CONSUMER_PREFIX} + {@code consumerProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String consumerPrefix(final String consumerProp) {
         return CONSUMER_PREFIX + consumerProp;
     }
@@ -673,6 +729,7 @@ public static String consumerPrefix(final String consumerProp) {
      * @param consumerProp the consumer property to be masked
      * @return {@link #MAIN_CONSUMER_PREFIX} + {@code consumerProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String mainConsumerPrefix(final String consumerProp) {
         return MAIN_CONSUMER_PREFIX + consumerProp;
     }
@@ -684,6 +741,7 @@ public static String mainConsumerPrefix(final String consumerProp) {
      * @param consumerProp the consumer property to be masked
      * @return {@link #RESTORE_CONSUMER_PREFIX} + {@code consumerProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String restoreConsumerPrefix(final String consumerProp) {
         return RESTORE_CONSUMER_PREFIX + consumerProp;
     }
@@ -695,6 +753,7 @@ public static String restoreConsumerPrefix(final String consumerProp) {
      * @param consumerProp the consumer property to be masked
      * @return {@link #GLOBAL_CONSUMER_PREFIX} + {@code consumerProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String globalConsumerPrefix(final String consumerProp) {
         return GLOBAL_CONSUMER_PREFIX + consumerProp;
     }
@@ -706,6 +765,7 @@ public static String globalConsumerPrefix(final String consumerProp) {
      * @param producerProp the producer property to be masked
      * @return PRODUCER_PREFIX + {@code producerProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String producerPrefix(final String producerProp) {
         return PRODUCER_PREFIX + producerProp;
     }
@@ -717,6 +777,7 @@ public static String producerPrefix(final String producerProp) {
      * @param adminClientProp the admin client property to be masked
      * @return ADMIN_CLIENT_PREFIX + {@code adminClientProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String adminClientPrefix(final String adminClientProp) {
         return ADMIN_CLIENT_PREFIX + adminClientProp;
     }
@@ -728,6 +789,7 @@ public static String adminClientPrefix(final String adminClientProp) {
      * @param topicProp the topic property to be masked
      * @return TOPIC_PREFIX + {@code topicProp}
      */
+    @SuppressWarnings("WeakerAccess")
     public static String topicPrefix(final String topicProp) {
         return TOPIC_PREFIX + topicProp;
     }
@@ -737,6 +799,7 @@ public static String topicPrefix(final String topicProp) {
      *
      * @return a copy of the config definition
      */
+    @SuppressWarnings("unused")
     public static ConfigDef configDef() {
         return new ConfigDef(CONFIG);
     }
@@ -788,8 +851,8 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
         // consumer/producer configurations, log a warning and remove the user defined value from the Map.
         // Thus the default values for these consumer/producer configurations that are suitable for
         // Streams will be used instead.
-        final Object maxInflightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
-        if (eosEnabled && maxInflightRequests != null && 5 < (int) maxInflightRequests) {
+        final Object maxInFlightRequests = clientProvidedProps.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+        if (eosEnabled && maxInFlightRequests != null && 5 < (int) maxInFlightRequests) {
             throw new ConfigException(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " can't exceed 5 when using the idempotent producer");
         }
         for (final String config: nonConfigurableConfigs) {
@@ -831,8 +894,9 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param groupId      consumer groupId
      * @param clientId     clientId
      * @return Map of the consumer configuration.
-     * @Deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
+     * @deprecated use {@link StreamsConfig#getMainConsumerConfigs(String, String)}
      */
+    @SuppressWarnings("WeakerAccess")
     @Deprecated
     public Map<String, Object> getConsumerConfigs(final String groupId,
                                                   final String clientId) {
@@ -853,13 +917,14 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param clientId     clientId
      * @return Map of the consumer configuration.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<String, Object> getMainConsumerConfigs(final String groupId,
                                                       final String clientId) {
-        Map<String, Object> consumerProps = getCommonConsumerConfigs();
+        final Map<String, Object> consumerProps = getCommonConsumerConfigs();
 
         // Get main consumer override configs
-        Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
-        for (Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
+        final Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
+        for (final Map.Entry<String, Object> entry: mainConsumerProps.entrySet()) {
             consumerProps.put(entry.getKey(), entry.getValue());
         }
 
@@ -919,12 +984,13 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param clientId clientId
      * @return Map of the restore consumer configuration.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
-        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+        final Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
 
         // Get restore consumer override configs
-        Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
-        for (Map.Entry<String, Object> entry: restoreConsumerProps.entrySet()) {
+        final Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
+        for (final Map.Entry<String, Object> entry: restoreConsumerProps.entrySet()) {
             baseConsumerProps.put(entry.getKey(), entry.getValue());
         }
 
@@ -950,12 +1016,13 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param clientId clientId
      * @return Map of the global consumer configuration.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
-        Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
+        final Map<String, Object> baseConsumerProps = getCommonConsumerConfigs();
 
         // Get global consumer override configs
-        Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
-        for (Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
+        final Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
+        for (final Map.Entry<String, Object> entry: globalConsumerProps.entrySet()) {
             baseConsumerProps.put(entry.getKey(), entry.getValue());
         }
 
@@ -977,6 +1044,7 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param clientId clientId
      * @return Map of the producer configuration.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<String, Object> getProducerConfigs(final String clientId) {
         final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
 
@@ -999,6 +1067,7 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      * @param clientId clientId
      * @return Map of the admin client configuration.
      */
+    @SuppressWarnings("WeakerAccess")
     public Map<String, Object> getAdminConfigs(final String clientId) {
         final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
 
@@ -1045,10 +1114,11 @@ private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Obje
      *
      * @return an configured instance of key Serde class
      */
+    @SuppressWarnings("WeakerAccess")
     public Serde defaultKeySerde() {
-        Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
+        final Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
         try {
-            Serde<?> serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
+            final Serde<?> serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
@@ -1063,10 +1133,11 @@ public Serde defaultKeySerde() {
      *
      * @return an configured instance of value Serde class
      */
+    @SuppressWarnings("WeakerAccess")
     public Serde defaultValueSerde() {
-        Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
+        final Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
         try {
-            Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
+            final Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
             serde.configure(originals(), false);
             return serde;
         } catch (final Exception e) {
@@ -1075,14 +1146,17 @@ public Serde defaultValueSerde() {
         }
     }
 
+    @SuppressWarnings("WeakerAccess")
     public TimestampExtractor defaultTimestampExtractor() {
         return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
     }
 
+    @SuppressWarnings("WeakerAccess")
     public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
         return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
     }
 
+    @SuppressWarnings("WeakerAccess")
     public ProductionExceptionHandler defaultProductionExceptionHandler() {
         return getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6f4f454bfc4..079d405cb50 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -40,15 +40,15 @@
     private final Logger log;
     private final String taskTypeName;
     private final TaskAction<T> commitAction;
-    private Map<TaskId, T> created = new HashMap<>();
-    private Map<TaskId, T> suspended = new HashMap<>();
-    private Map<TaskId, T> restoring = new HashMap<>();
-    private Set<TopicPartition> restoredPartitions = new HashSet<>();
-    private Set<TaskId> previousActiveTasks = new HashSet<>();
+    private final Map<TaskId, T> created = new HashMap<>();
+    private final Map<TaskId, T> suspended = new HashMap<>();
+    private final Map<TaskId, T> restoring = new HashMap<>();
+    private final Set<TopicPartition> restoredPartitions = new HashSet<>();
+    private final Set<TaskId> previousActiveTasks = new HashSet<>();
     // IQ may access this map.
-    Map<TaskId, T> running = new ConcurrentHashMap<>();
-    private Map<TopicPartition, T> runningByPartition = new HashMap<>();
-    Map<TopicPartition, T> restoringByPartition = new HashMap<>();
+    final Map<TaskId, T> running = new ConcurrentHashMap<>();
+    private final Map<TopicPartition, T> runningByPartition = new HashMap<>();
+    final Map<TopicPartition, T> restoringByPartition = new HashMap<>();
 
     AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
@@ -176,7 +176,7 @@ private RuntimeException closeNonRunningTasks(final Collection<T> tasks) {
 
     private RuntimeException suspendTasks(final Collection<T> tasks) {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-        for (Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
+        for (final Iterator<T> it = tasks.iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
                 task.suspend();
@@ -249,10 +249,10 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition>
 
     private void addToRestoring(final T task) {
         restoring.put(task.id(), task);
-        for (TopicPartition topicPartition : task.partitions()) {
+        for (final TopicPartition topicPartition : task.partitions()) {
             restoringByPartition.put(topicPartition, task);
         }
-        for (TopicPartition topicPartition : task.changelogPartitions()) {
+        for (final TopicPartition topicPartition : task.changelogPartitions()) {
             restoringByPartition.put(topicPartition, task);
         }
     }
@@ -264,10 +264,10 @@ private void transitionToRunning(final T task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
-        for (TopicPartition topicPartition : task.partitions()) {
+        for (final TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
         }
-        for (TopicPartition topicPartition : task.changelogPartitions()) {
+        for (final TopicPartition topicPartition : task.changelogPartitions()) {
             runningByPartition.put(topicPartition, task);
         }
     }
@@ -356,7 +356,7 @@ int commit() {
     void applyToRunningTasks(final TaskAction<T> action) {
         RuntimeException firstException = null;
 
-        for (Iterator<T> it = running().iterator(); it.hasNext(); ) {
+        for (final Iterator<T> it = running().iterator(); it.hasNext(); ) {
             final T task = it.next();
             try {
                 action.apply(task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 3080d2e1583..e72c4a5de94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -62,6 +62,7 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
@@ -69,7 +70,7 @@
 public class StreamThread extends Thread {
 
     private final static int UNLIMITED_RECORDS = -1;
-    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
+    private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     /**
      * Stream thread states are the possible states that a stream thread can be in.
@@ -264,7 +265,9 @@ public void onPartitionsAssigned(final Collection<TopicPartition> assignment) {
                 if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
                     return;
                 }
-                taskManager.createTasks(assignment);
+                if (!streamThread.versionProbingFlag.get()) {
+                    taskManager.createTasks(assignment);
+                }
             } catch (final Throwable t) {
                 log.error(
                     "Error caught during partition assignment, " +
@@ -298,7 +301,11 @@ public void onPartitionsRevoked(final Collection<TopicPartition> assignment) {
                 final long start = time.milliseconds();
                 try {
                     // suspend active tasks
-                    taskManager.suspendTasksAndState();
+                    if (streamThread.versionProbingFlag.get()) {
+                        streamThread.versionProbingFlag.set(false);
+                    } else {
+                        taskManager.suspendTasksAndState();
+                    }
                 } catch (final Throwable t) {
                     log.error(
                         "Error caught during partition revocation, " +
@@ -555,6 +562,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
     private final String logPrefix;
     private final TaskManager taskManager;
     private final StreamsMetricsThreadImpl streamsMetrics;
+    private final AtomicBoolean versionProbingFlag;
 
     private long lastCommitMs;
     private long timerStartedMs;
@@ -647,6 +655,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
         final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId);
         consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        final AtomicBoolean versionProbingFlag = new AtomicBoolean();
+        consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, versionProbingFlag);
         String originalReset = null;
         if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
             originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
@@ -666,7 +676,8 @@ public static StreamThread create(final InternalTopologyBuilder builder,
             streamsMetrics,
             builder,
             threadClientId,
-            logContext);
+            logContext,
+            versionProbingFlag);
     }
 
     public StreamThread(final Time time,
@@ -679,7 +690,8 @@ public StreamThread(final Time time,
                         final StreamsMetricsThreadImpl streamsMetrics,
                         final InternalTopologyBuilder builder,
                         final String threadClientId,
-                        final LogContext logContext) {
+                        final LogContext logContext,
+                        final AtomicBoolean versionProbingFlag) {
         super(threadClientId);
 
         this.stateLock = new Object();
@@ -696,6 +708,7 @@ public StreamThread(final Time time,
         this.restoreConsumer = restoreConsumer;
         this.consumer = consumer;
         this.originalReset = originalReset;
+        this.versionProbingFlag = versionProbingFlag;
 
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
@@ -750,19 +763,26 @@ private void runLoop() {
         while (isRunning()) {
             try {
                 recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit);
+                if (versionProbingFlag.get()) {
+                    log.info("Version probing detected. Triggering new rebalance.");
+                    enforceRebalance();
+                }
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
                 log.warn("Detected task {} that got migrated to another thread. " +
                         "This implies that this thread missed a rebalance and dropped out of the consumer group. " +
                         "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}",
                     ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">"));
 
-                // re-subscribe to enforce a rebalance in the next poll call
-                consumer.unsubscribe();
-                consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
+                enforceRebalance();
             }
         }
     }
 
+    private void enforceRebalance() {
+        consumer.unsubscribe();
+        consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
+    }
+
     /**
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException      If the store's change log does not contain the partition
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index e1464e6b72c..db94ac0c852 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -51,6 +51,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
@@ -59,6 +60,12 @@
 
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
+    private final static int VERSION_ONE = 1;
+    private final static int VERSION_TWO = 2;
+    private final static int VERSION_THREE = 3;
+    private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
+    private int minReceivedMetadataVersion = UNKNOWN;
+    protected Set<Integer> supportedVersions = new HashSet<>();
 
     private Logger log;
     private String logPrefix;
@@ -159,7 +166,7 @@ public String toString() {
         }
     }
 
-    private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
+    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
         @Override
         public int compare(final TopicPartition p1,
                            final TopicPartition p2) {
@@ -178,12 +185,21 @@ public int compare(final TopicPartition p1,
 
     private TaskManager taskManager;
     private PartitionGrouper partitionGrouper;
+    private AtomicBoolean versionProbingFlag;
 
-    private int userMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+    protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
 
     private InternalTopicManager internalTopicManager;
     private CopartitionedTopicsValidator copartitionedTopicsValidator;
 
+    protected String userEndPoint() {
+        return userEndPoint;
+    }
+
+    protected TaskManager taskManger() {
+        return taskManager;
+    }
+
     /**
      * We need to have the PartitionAssignor and its StreamThread to be mutually accessible
      * since the former needs later's cached metadata while sending subscriptions,
@@ -204,15 +220,15 @@ public void configure(final Map<String, ?> configs) {
             switch (upgradeFrom) {
                 case StreamsConfig.UPGRADE_FROM_0100:
                     log.info("Downgrading metadata version from {} to 1 for upgrade from 0.10.0.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
-                    userMetadataVersion = 1;
+                    usedSubscriptionMetadataVersion = VERSION_ONE;
                     break;
                 case StreamsConfig.UPGRADE_FROM_0101:
                 case StreamsConfig.UPGRADE_FROM_0102:
                 case StreamsConfig.UPGRADE_FROM_0110:
                 case StreamsConfig.UPGRADE_FROM_10:
                 case StreamsConfig.UPGRADE_FROM_11:
-                    log.info("Downgrading metadata version from {} to 2 for upgrade from " + upgradeFrom + ".x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION);
-                    userMetadataVersion = 2;
+                    log.info("Downgrading metadata version from {} to 2 for upgrade from {}.x.", SubscriptionInfo.LATEST_SUPPORTED_VERSION, upgradeFrom);
+                    usedSubscriptionMetadataVersion = VERSION_TWO;
                     break;
                 default:
                     throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
@@ -234,6 +250,21 @@ public void configure(final Map<String, ?> configs) {
 
         taskManager = (TaskManager) o;
 
+        final Object o2 = configs.get(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG);
+        if (o2 == null) {
+            final KafkaException fatalException = new KafkaException("VersionProbingFlag is not specified");
+            log.error(fatalException.getMessage(), fatalException);
+            throw fatalException;
+        }
+
+        if (!(o2 instanceof AtomicBoolean)) {
+            final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o2.getClass().getName(), AtomicBoolean.class.getName()));
+            log.error(fatalException.getMessage(), fatalException);
+            throw fatalException;
+        }
+
+        versionProbingFlag = (AtomicBoolean) o2;
+
         numStandbyReplicas = streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
         partitionGrouper = streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -277,7 +308,7 @@ public Subscription subscription(final Set<String> topics) {
         final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
         standbyTasks.removeAll(previousActiveTasks);
         final SubscriptionInfo data = new SubscriptionInfo(
-            userMetadataVersion,
+            usedSubscriptionMetadataVersion,
             taskManager.processId(),
             previousActiveTasks,
             standbyTasks,
@@ -313,20 +344,25 @@ public Subscription subscription(final Set<String> topics) {
                                           final Map<String, Subscription> subscriptions) {
         // construct the client metadata from the decoded subscription info
         final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+        final Set<String> futureConsumers = new HashSet<>();
 
-        int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+        minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+        supportedVersions.clear();
+        int futureMetadataVersion = UNKNOWN;
         for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             final String consumerId = entry.getKey();
             final Subscription subscription = entry.getValue();
 
             final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
             final int usedVersion = info.version();
+            supportedVersions.add(info.latestSupportedVersion());
             if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
-                throw new IllegalStateException("Unknown metadata version: " + usedVersion
-                    + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+                futureMetadataVersion = usedVersion;
+                futureConsumers.add(consumerId);
+                continue;
             }
-            if (usedVersion < minUserMetadataVersion) {
-                minUserMetadataVersion = usedVersion;
+            if (usedVersion < minReceivedMetadataVersion) {
+                minReceivedMetadataVersion = usedVersion;
             }
 
             // create the new client metadata if necessary
@@ -341,6 +377,27 @@ public Subscription subscription(final Set<String> topics) {
             clientMetadata.addConsumer(consumerId, info);
         }
 
+        final boolean versionProbing;
+        if (futureMetadataVersion != UNKNOWN) {
+            if (minReceivedMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
+                log.info("Received a future (version probing) subscription (version: {}). Sending empty assignment back (with supported version {}).",
+                    futureMetadataVersion,
+                    SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+                versionProbing = true;
+            } else {
+                throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion
+                    + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + ") at the same time.");
+            }
+        } else {
+            versionProbing = false;
+        }
+
+        if (minReceivedMetadataVersion < SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+            log.info("Downgrading metadata to version {}. Latest supported version is {}.",
+                minReceivedMetadataVersion,
+                SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+        }
+
         log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
 
         // ---------------- Step Zero ---------------- //
@@ -457,12 +514,7 @@ public Subscription subscription(final Set<String> topics) {
             allAssignedPartitions.addAll(partitions);
 
             final TaskId id = entry.getKey();
-            Set<TaskId> ids = tasksByTopicGroup.get(id.topicGroupId);
-            if (ids == null) {
-                ids = new HashSet<>();
-                tasksByTopicGroup.put(id.topicGroupId, ids);
-            }
-            ids.add(id);
+            tasksByTopicGroup.computeIfAbsent(id.topicGroupId, k -> new HashSet<>()).add(id);
         }
         for (final String topic : allSourceTopics) {
             final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
@@ -530,7 +582,7 @@ public Subscription subscription(final Set<String> topics) {
 
         // construct the global partition assignment per host map
         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
-        if (minUserMetadataVersion == 2 || minUserMetadataVersion == 3) {
+        if (minReceivedMetadataVersion == 2 || minReceivedMetadataVersion == 3) {
             for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
                 final HostInfo hostInfo = entry.getValue().hostInfo;
 
@@ -548,8 +600,23 @@ public Subscription subscription(final Set<String> topics) {
         }
         taskManager.setPartitionsByHostState(partitionsByHostState);
 
-        // within the client, distribute tasks to its owned consumers
+        final Map<String, Assignment> assignment;
+        if (versionProbing) {
+            assignment = versionProbingAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, futureConsumers, minReceivedMetadataVersion);
+        } else {
+            assignment = computeNewAssignment(clientsMetadata, partitionsForTask, partitionsByHostState, minReceivedMetadataVersion);
+        }
+
+        return assignment;
+    }
+
+    private Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+                                                         final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                                         final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+                                                         final int minUserMetadataVersion) {
         final Map<String, Assignment> assignment = new HashMap<>();
+
+        // within the client, distribute tasks to its owned consumers
         for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
             final Set<String> consumers = entry.getValue().consumers;
             final ClientState state = entry.getValue().state;
@@ -574,12 +641,7 @@ public Subscription subscription(final Set<String> topics) {
                 if (!state.standbyTasks().isEmpty()) {
                     final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
                     for (final TaskId taskId : assignedStandbyList) {
-                        Set<TopicPartition> standbyPartitions = standby.get(taskId);
-                        if (standbyPartitions == null) {
-                            standbyPartitions = new HashSet<>();
-                            standby.put(taskId, standbyPartitions);
-                        }
-                        standbyPartitions.addAll(partitionsForTask.get(taskId));
+                        standby.computeIfAbsent(taskId, k -> new HashSet<>()).addAll(partitionsForTask.get(taskId));
                     }
                 }
 
@@ -603,13 +665,63 @@ public Subscription subscription(final Set<String> topics) {
         return assignment;
     }
 
+    private Map<String, Assignment> versionProbingAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
+                                                             final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                                             final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
+                                                             final Set<String> futureConsumers,
+                                                             final int minUserMetadataVersion) {
+        final Map<String, Assignment> assignment = new HashMap<>();
+
+        // assign previously assigned tasks to "old consumers"
+        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
+            for (final String consumerId : clientMetadata.consumers) {
+
+                if (futureConsumers.contains(consumerId)) {
+                    continue;
+                }
+
+                final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks());
+
+                final List<TopicPartition> assignedPartitions = new ArrayList<>();
+                for (final TaskId taskId : activeTasks) {
+                    assignedPartitions.addAll(partitionsForTask.get(taskId));
+                }
+
+                final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+                for (final TaskId taskId : clientMetadata.state.prevStandbyTasks()) {
+                    standbyTasks.put(taskId, partitionsForTask.get(taskId));
+                }
+
+                assignment.put(consumerId, new Assignment(
+                    assignedPartitions,
+                    new AssignmentInfo(
+                        minUserMetadataVersion,
+                        activeTasks,
+                        standbyTasks,
+                        partitionsByHostState)
+                        .encode()
+                ));
+            }
+        }
+
+        // add empty assignment for "future version" clients (ie, empty version probing response)
+        for (final String consumerId : futureConsumers) {
+            assignment.put(consumerId, new Assignment(
+                Collections.emptyList(),
+                new AssignmentInfo().encode()
+            ));
+        }
+
+        return assignment;
+    }
+
     // visible for testing
     List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) {
         final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
         Collections.sort(sortedTasks);
         final List<List<TaskId>> taskIdsForConsumerAssignment = new ArrayList<>(numberThreads);
         for (int i = 0; i < numberThreads; i++) {
-            taskIdsForConsumerAssignment.add(new ArrayList<TaskId>());
+            taskIdsForConsumerAssignment.add(new ArrayList<>());
         }
         while (!sortedTasks.isEmpty()) {
             for (final List<TaskId> taskIdList : taskIdsForConsumerAssignment) {
@@ -632,7 +744,35 @@ public void onAssignment(final Assignment assignment) {
         Collections.sort(partitions, PARTITION_COMPARATOR);
 
         final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
-        final int usedVersion = info.version();
+        final int receivedAssignmentMetadataVersion = info.version();
+        final int leaderSupportedVersion = info.latestSupportedVersion();
+
+        if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) {
+            throw new IllegalStateException("Sent a version " + usedSubscriptionMetadataVersion
+                + " subscription but got an assignment with higher version " + receivedAssignmentMetadataVersion + ".");
+        }
+
+        if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion
+            && receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
+
+            if (receivedAssignmentMetadataVersion == leaderSupportedVersion) {
+                log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " +
+                        "Downgrading subscription metadata to received version and trigger new rebalance.",
+                    usedSubscriptionMetadataVersion,
+                    receivedAssignmentMetadataVersion);
+                usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion;
+            } else {
+                log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). " +
+                    "Setting subscription metadata to leaders supported version {} and trigger new rebalance.",
+                    usedSubscriptionMetadataVersion,
+                    receivedAssignmentMetadataVersion,
+                    leaderSupportedVersion);
+                usedSubscriptionMetadataVersion = leaderSupportedVersion;
+            }
+
+            versionProbingFlag.set(true);
+            return;
+        }
 
         // version 1 field
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -640,22 +780,29 @@ public void onAssignment(final Assignment assignment) {
         final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
         final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
 
-        switch (usedVersion) {
-            case 1:
+        switch (receivedAssignmentMetadataVersion) {
+            case VERSION_ONE:
                 processVersionOneAssignment(info, partitions, activeTasks);
                 partitionsByHost = Collections.emptyMap();
                 break;
-            case 2:
+            case VERSION_TWO:
                 processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
-            case 3:
+            case VERSION_THREE:
+                if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
+                    log.info("Sent a version {} subscription and group leader's latest supported version is {}. " +
+                        "Upgrading subscription metadata version to {} for next rebalance.",
+                        usedSubscriptionMetadataVersion,
+                        leaderSupportedVersion,
+                        leaderSupportedVersion);
+                    usedSubscriptionMetadataVersion = leaderSupportedVersion;
+                }
                 processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
                 partitionsByHost = info.partitionsByHost();
                 break;
             default:
-                throw new IllegalStateException("Unknown metadata version: " + usedVersion
-                    + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
+                throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
         }
 
         taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
@@ -679,13 +826,7 @@ private void processVersionOneAssignment(final AssignmentInfo info,
         for (int i = 0; i < partitions.size(); i++) {
             final TopicPartition partition = partitions.get(i);
             final TaskId id = info.activeTasks().get(i);
-
-            Set<TopicPartition> assignedPartitions = activeTasks.get(id);
-            if (assignedPartitions == null) {
-                assignedPartitions = new HashSet<>();
-                activeTasks.put(id, assignedPartitions);
-            }
-            assignedPartitions.add(partition);
+            activeTasks.computeIfAbsent(id, k -> new HashSet<>()).add(partition);
         }
     }
 
@@ -713,6 +854,14 @@ private void processVersionThreeAssignment(final AssignmentInfo info,
         processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
     }
 
+    // for testing
+    protected void processLatestVersionAssignment(final AssignmentInfo info,
+                                                  final List<TopicPartition> partitions,
+                                                  final Map<TaskId, Set<TopicPartition>> activeTasks,
+                                                  final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
+        processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+    }
+
     /**
      * Internal helper function that creates a Kafka topic
      *
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 63224dbcde7..6e6e4ca7c5c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -42,7 +42,7 @@
 
 import static java.util.Collections.singleton;
 
-class TaskManager {
+public class TaskManager {
     // initialize the task list
     // activeTasks needs to be concurrent as it can be accessed
     // by QueryableState
@@ -187,14 +187,14 @@ private void addStandbyTasks() {
         return standby.allAssignedTaskIds();
     }
 
-    Set<TaskId> prevActiveTaskIds() {
+    public Set<TaskId> prevActiveTaskIds() {
         return active.previousTaskIds();
     }
 
     /**
      * Returns ids of tasks whose states are kept on the local storage.
      */
-    Set<TaskId> cachedTasksIds() {
+    public Set<TaskId> cachedTasksIds() {
         // A client could contain some inactive tasks whose states are still kept on the local storage in the following scenarios:
         // 1) the client is actively maintaining standby tasks by maintaining their states from the change log.
         // 2) the client has just got some tasks migrated out of itself to other clients while these task states
@@ -221,7 +221,7 @@ private void addStandbyTasks() {
         return tasks;
     }
 
-    UUID processId() {
+    public UUID processId() {
         return processId;
     }
 
@@ -356,21 +356,21 @@ private void assignStandbyPartitions() {
         }
     }
 
-    void setClusterMetadata(final Cluster cluster) {
+    public void setClusterMetadata(final Cluster cluster) {
         this.cluster = cluster;
     }
 
-    void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> partitionsByHostState) {
+    public void setPartitionsByHostState(final Map<HostInfo, Set<TopicPartition>> partitionsByHostState) {
         this.streamsMetadataState.onChange(partitionsByHostState, cluster);
     }
 
-    void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
+    public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeTasks,
                                final Map<TaskId, Set<TopicPartition>> standbyTasks) {
         this.assignedActiveTasks = activeTasks;
         this.assignedStandbyTasks = standbyTasks;
     }
 
-    void updateSubscriptionsFromAssignment(List<TopicPartition> partitions) {
+    public void updateSubscriptionsFromAssignment(List<TopicPartition> partitions) {
         if (builder().sourceTopicPattern() != null) {
             final Set<String> assignedTopics = new HashSet<>();
             for (final TopicPartition topicPartition : partitions) {
@@ -385,7 +385,7 @@ void updateSubscriptionsFromAssignment(List<TopicPartition> partitions) {
         }
     }
 
-    void updateSubscriptionsFromMetadata(Set<String> topics) {
+    public void updateSubscriptionsFromMetadata(Set<String> topics) {
         if (builder().sourceTopicPattern() != null) {
             final Collection<String> existingTopics = builder().subscriptionUpdates().getUpdates();
             if (!existingTopics.equals(topics)) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 3c5cee2bfc3..c577830e3e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -42,7 +42,7 @@
     private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
 
     public static final int LATEST_SUPPORTED_VERSION = 3;
-    public static final int UNKNOWN = -1;
+    static final int UNKNOWN = -1;
 
     private final int usedVersion;
     private final int latestSupportedVersion;
@@ -65,9 +65,9 @@ public AssignmentInfo(final List<TaskId> activeTasks,
 
     public AssignmentInfo() {
         this(LATEST_SUPPORTED_VERSION,
-            Collections.<TaskId>emptyList(),
-            Collections.<TaskId, Set<TopicPartition>>emptyMap(),
-            Collections.<HostInfo, Set<TopicPartition>>emptyMap());
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            Collections.emptyMap());
     }
 
     public AssignmentInfo(final int version,
@@ -229,7 +229,7 @@ public static AssignmentInfo decode(final ByteBuffer data) {
                     decodeVersionThreeData(assignmentInfo, in);
                     break;
                 default:
-                    TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " +
+                    final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " +
                         "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
                     log.error(fatalException.getMessage(), fatalException);
                     throw fatalException;
@@ -262,7 +262,7 @@ private static void decodeStandbyTasks(final AssignmentInfo assignmentInfo,
         final int count = in.readInt();
         assignmentInfo.standbyTasks = new HashMap<>(count);
         for (int i = 0; i < count; i++) {
-            TaskId id = TaskId.readFrom(in);
+            final TaskId id = TaskId.readFrom(in);
             assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index 15ee849bffc..66e655fa837 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -26,6 +26,7 @@
     private final Set<TaskId> standbyTasks;
     private final Set<TaskId> assignedTasks;
     private final Set<TaskId> prevActiveTasks;
+    private final Set<TaskId> prevStandbyTasks;
     private final Set<TaskId> prevAssignedTasks;
 
     private int capacity;
@@ -36,21 +37,34 @@ public ClientState() {
     }
 
     ClientState(final int capacity) {
-        this(new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), capacity);
+        this(new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), new HashSet<TaskId>(), capacity);
     }
 
-    private ClientState(Set<TaskId> activeTasks, Set<TaskId> standbyTasks, Set<TaskId> assignedTasks, Set<TaskId> prevActiveTasks, Set<TaskId> prevAssignedTasks, int capacity) {
+    private ClientState(final Set<TaskId> activeTasks,
+                        final Set<TaskId> standbyTasks,
+                        final Set<TaskId> assignedTasks,
+                        final Set<TaskId> prevActiveTasks,
+                        final Set<TaskId> prevStandbyTasks,
+                        final Set<TaskId> prevAssignedTasks,
+                        final int capacity) {
         this.activeTasks = activeTasks;
         this.standbyTasks = standbyTasks;
         this.assignedTasks = assignedTasks;
         this.prevActiveTasks = prevActiveTasks;
+        this.prevStandbyTasks = prevStandbyTasks;
         this.prevAssignedTasks = prevAssignedTasks;
         this.capacity = capacity;
     }
 
     public ClientState copy() {
-        return new ClientState(new HashSet<>(activeTasks), new HashSet<>(standbyTasks), new HashSet<>(assignedTasks),
-                new HashSet<>(prevActiveTasks), new HashSet<>(prevAssignedTasks), capacity);
+        return new ClientState(
+            new HashSet<>(activeTasks),
+            new HashSet<>(standbyTasks),
+            new HashSet<>(assignedTasks),
+            new HashSet<>(prevActiveTasks),
+            new HashSet<>(prevStandbyTasks),
+            new HashSet<>(prevAssignedTasks),
+            capacity);
     }
 
     public void assign(final TaskId taskId, final boolean active) {
@@ -71,6 +85,14 @@ public void assign(final TaskId taskId, final boolean active) {
         return standbyTasks;
     }
 
+    public Set<TaskId> prevActiveTasks() {
+        return prevActiveTasks;
+    }
+
+    public Set<TaskId> prevStandbyTasks() {
+        return prevStandbyTasks;
+    }
+
     public int assignedTaskCount() {
         return assignedTasks.size();
     }
@@ -89,6 +111,7 @@ public void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
     }
 
     public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
+        prevStandbyTasks.addAll(standbyTasks);
         prevAssignedTasks.addAll(standbyTasks);
     }
 
@@ -98,6 +121,7 @@ public String toString() {
                 ") standbyTasks: (" + standbyTasks +
                 ") assignedTasks: (" + assignedTasks +
                 ") prevActiveTasks: (" + prevActiveTasks +
+                ") prevStandbyTasks: (" + prevStandbyTasks +
                 ") prevAssignedTasks: (" + prevAssignedTasks +
                 ") capacity: " + capacity +
                 "]";
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index be709472441..4ebc95674b0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -33,7 +33,7 @@
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
     public static final int LATEST_SUPPORTED_VERSION = 3;
-    public static final int UNKNOWN = -1;
+    static final int UNKNOWN = -1;
 
     private final int usedVersion;
     private final int latestSupportedVersion;
@@ -151,20 +151,20 @@ private int getVersionOneByteLength() {
                4 + standbyTasks.size() * 8; // length + standby tasks
     }
 
-    private void encodeClientUUID(final ByteBuffer buf) {
+    protected void encodeClientUUID(final ByteBuffer buf) {
         buf.putLong(processId.getMostSignificantBits());
         buf.putLong(processId.getLeastSignificantBits());
     }
 
-    private void encodeTasks(final ByteBuffer buf,
-                             final Collection<TaskId> taskIds) {
+    protected void encodeTasks(final ByteBuffer buf,
+                               final Collection<TaskId> taskIds) {
         buf.putInt(taskIds.size());
-        for (TaskId id : taskIds) {
+        for (final TaskId id : taskIds) {
             id.writeTo(buf);
         }
     }
 
-    private byte[] prepareUserEndPoint() {
+    protected byte[] prepareUserEndPoint() {
         if (userEndPoint == null) {
             return new byte[0];
         } else {
@@ -194,8 +194,8 @@ private int getVersionTwoByteLength(final byte[] endPointBytes) {
                4 + endPointBytes.length; // length + userEndPoint
     }
 
-    private void encodeUserEndPoint(final ByteBuffer buf,
-                                    final byte[] endPointBytes) {
+    protected void encodeUserEndPoint(final ByteBuffer buf,
+                                      final byte[] endPointBytes) {
         if (endPointBytes != null) {
             buf.putInt(endPointBytes.length);
             buf.put(endPointBytes);
@@ -217,7 +217,7 @@ private ByteBuffer encodeVersionThree() {
         return buf;
     }
 
-    private int getVersionThreeByteLength(final byte[] endPointBytes) {
+    protected int getVersionThreeByteLength(final byte[] endPointBytes) {
         return 4 + // used version
                4 + // latest supported version version
                16 + // client ID
@@ -236,6 +236,7 @@ public static SubscriptionInfo decode(final ByteBuffer data) {
         data.rewind();
 
         final int usedVersion = data.getInt();
+        final int latestSupportedVersion;
         switch (usedVersion) {
             case 1:
                 subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
@@ -246,12 +247,13 @@ public static SubscriptionInfo decode(final ByteBuffer data) {
                 decodeVersionTwoData(subscriptionInfo, data);
                 break;
             case 3:
-                final int latestSupportedVersion = data.getInt();
+                latestSupportedVersion = data.getInt();
                 subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
                 decodeVersionThreeData(subscriptionInfo, data);
                 break;
             default:
-                subscriptionInfo = new SubscriptionInfo(usedVersion, UNKNOWN);
+                latestSupportedVersion = data.getInt();
+                subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
                 log.info("Unable to decode subscription data: used version: {}; latest supported version: {}", usedVersion, LATEST_SUPPORTED_VERSION);
         }
 
@@ -261,12 +263,7 @@ public static SubscriptionInfo decode(final ByteBuffer data) {
     private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
                                              final ByteBuffer data) {
         decodeClientUUID(subscriptionInfo, data);
-
-        subscriptionInfo.prevTasks = new HashSet<>();
-        decodeTasks(subscriptionInfo.prevTasks, data);
-
-        subscriptionInfo.standbyTasks = new HashSet<>();
-        decodeTasks(subscriptionInfo.standbyTasks, data);
+        decodeTasks(subscriptionInfo, data);
     }
 
     private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo,
@@ -274,30 +271,31 @@ private static void decodeClientUUID(final SubscriptionInfo subscriptionInfo,
         subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
     }
 
-    private static void decodeTasks(final Collection<TaskId> taskIds,
+    private static void decodeTasks(final SubscriptionInfo subscriptionInfo,
                                     final ByteBuffer data) {
-        final int numPrevs = data.getInt();
-        for (int i = 0; i < numPrevs; i++) {
-            taskIds.add(TaskId.readFrom(data));
+        subscriptionInfo.prevTasks = new HashSet<>();
+        final int numPrevTasks = data.getInt();
+        for (int i = 0; i < numPrevTasks; i++) {
+            subscriptionInfo.prevTasks.add(TaskId.readFrom(data));
+        }
+
+        subscriptionInfo.standbyTasks = new HashSet<>();
+        final int numStandbyTasks = data.getInt();
+        for (int i = 0; i < numStandbyTasks; i++) {
+            subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
         }
     }
 
     private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
                                              final ByteBuffer data) {
         decodeClientUUID(subscriptionInfo, data);
-
-        subscriptionInfo.prevTasks = new HashSet<>();
-        decodeTasks(subscriptionInfo.prevTasks, data);
-
-        subscriptionInfo.standbyTasks = new HashSet<>();
-        decodeTasks(subscriptionInfo.standbyTasks, data);
-
+        decodeTasks(subscriptionInfo, data);
         decodeUserEndPoint(subscriptionInfo, data);
     }
 
     private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo,
                                            final ByteBuffer data) {
-        int bytesLength = data.getInt();
+        final int bytesLength = data.getInt();
         if (bytesLength != 0) {
             final byte[] bytes = new byte[bytesLength];
             data.get(bytes);
@@ -308,16 +306,11 @@ private static void decodeUserEndPoint(final SubscriptionInfo subscriptionInfo,
     private static void decodeVersionThreeData(final SubscriptionInfo subscriptionInfo,
                                                final ByteBuffer data) {
         decodeClientUUID(subscriptionInfo, data);
-
-        subscriptionInfo.prevTasks = new HashSet<>();
-        decodeTasks(subscriptionInfo.prevTasks, data);
-
-        subscriptionInfo.standbyTasks = new HashSet<>();
-        decodeTasks(subscriptionInfo.standbyTasks, data);
-
+        decodeTasks(subscriptionInfo, data);
         decodeUserEndPoint(subscriptionInfo, data);
     }
 
+    @Override
     public int hashCode() {
         final int hashCode = usedVersion ^ latestSupportedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
         if (userEndPoint == null) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 749d618bac5..f3dce521998 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -78,6 +78,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -298,7 +299,8 @@ public void shouldNotCommitBeforeTheCommitInterval() {
             streamsMetrics,
             internalTopologyBuilder,
             clientId,
-            new LogContext("")
+            new LogContext(""),
+            new AtomicBoolean()
         );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
@@ -331,7 +333,9 @@ public void shouldNotCauseExceptionIfNothingCommitted() {
             streamsMetrics,
             internalTopologyBuilder,
             clientId,
-            new LogContext(""));
+            new LogContext(""),
+            new AtomicBoolean()
+        );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval - 10L);
         thread.maybeCommit(mockTime.milliseconds());
@@ -364,7 +368,9 @@ public void shouldCommitAfterTheCommitInterval() {
             streamsMetrics,
             internalTopologyBuilder,
             clientId,
-            new LogContext(""));
+            new LogContext(""),
+            new AtomicBoolean()
+        );
         thread.maybeCommit(mockTime.milliseconds());
         mockTime.sleep(commitInterval + 1);
         thread.maybeCommit(mockTime.milliseconds());
@@ -511,7 +517,8 @@ public void shouldShutdownTaskManagerOnClose() {
             streamsMetrics,
             internalTopologyBuilder,
             clientId,
-            new LogContext("")
+            new LogContext(""),
+            new AtomicBoolean()
         );
         thread.setStateListener(
             new StreamThread.StateListener() {
@@ -547,7 +554,8 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() {
             streamsMetrics,
             internalTopologyBuilder,
             clientId,
-            new LogContext("")
+            new LogContext(""),
+            new AtomicBoolean()
         );
         thread.shutdown();
         EasyMock.verify(taskManager);
@@ -574,7 +582,9 @@ public void shouldOnlyShutdownOnce() {
             streamsMetrics,
             internalTopologyBuilder,
             clientId,
-            new LogContext(""));
+            new LogContext(""),
+            new AtomicBoolean()
+        );
         thread.shutdown();
         // Execute the run method. Verification of the mock will check that shutdown was only done once
         thread.run();
@@ -1255,7 +1265,7 @@ private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata met
     @Test
     // TODO: Need to add a test case covering EOS when we create a mock taskManager class
     public void producerMetricsVerificationWithoutEOS() {
-        final MockProducer<byte[], byte[]> producer = new MockProducer();
+        final MockProducer<byte[], byte[]> producer = new MockProducer<>();
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
@@ -1271,7 +1281,8 @@ public void producerMetricsVerificationWithoutEOS() {
                 streamsMetrics,
                 internalTopologyBuilder,
                 clientId,
-                new LogContext(""));
+                new LogContext(""),
+                new AtomicBoolean());
         final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
         final Metric testMetric = new KafkaMetric(
                 new Object(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 37b03fa3418..a32d193a171 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -48,6 +48,7 @@
 import org.easymock.EasyMock;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -57,6 +58,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
@@ -98,7 +100,8 @@
     private final Cluster metadata = new Cluster(
         "cluster",
         Collections.singletonList(Node.noNode()),
-        infos, Collections.<String>emptySet(),
+        infos,
+        Collections.<String>emptySet(),
         Collections.<String>emptySet());
 
     private final TaskId task0 = new TaskId(0, 0);
@@ -115,15 +118,16 @@
     private final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
 
     private Map<String, Object> configProps() {
-        Map<String, Object> configurationMap = new HashMap<>();
+        final Map<String, Object> configurationMap = new HashMap<>();
         configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint);
         configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configurationMap.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, new AtomicBoolean());
         return configurationMap;
     }
 
     private void configurePartitionAssignor(final Map<String, Object> props) {
-        Map<String, Object> configurationMap = configProps();
+        final Map<String, Object> configurationMap = configProps();
         configurationMap.putAll(props);
         partitionAssignor.configure(configurationMap);
     }
@@ -158,7 +162,7 @@ public void shouldInterleaveTasksByGroupId() {
         final List<TaskId> expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1);
         final List<List<TaskId>> embeddedList = Arrays.asList(expectedSubList1, expectedSubList2, expectedSubList3);
 
-        List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
+        final List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
         Collections.shuffle(tasks);
 
         final List<List<TaskId>> interleavedTaskIds = partitionAssignor.interleaveTasksByGroupId(tasks, 3);
@@ -182,15 +186,15 @@ public void testSubscription() {
         mockTaskManager(prevTasks, cachedTasks, processId, builder);
 
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
-        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2"));
 
         Collections.sort(subscription.topics());
         assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics());
 
-        Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
+        final Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
         standbyTasks.removeAll(prevTasks);
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
+        final SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
         assertEquals(info.encode(), subscription.userData());
     }
 
@@ -199,8 +203,8 @@ public void testAssignBasic() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+        final List<String> topics = Utils.mkList("topic1", "topic2");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
@@ -209,15 +213,15 @@ public void testAssignBasic() {
         final Set<TaskId> standbyTasks11 = Utils.mkSet(task2);
         final Set<TaskId> standbyTasks20 = Utils.mkSet(task0);
 
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
 
         mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
@@ -226,7 +230,7 @@ public void testAssignBasic() {
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assigned partitions
         assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)),
@@ -236,17 +240,17 @@ public void testAssignBasic() {
         // check assignment info
 
         // the first consumer
-        AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
+        final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
+        final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         // the second consumer
-        AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
+        final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
         allActiveTasks.addAll(info11.activeTasks());
 
         assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
 
         // the third consumer
-        AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
+        final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
         allActiveTasks.addAll(info20.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
@@ -277,7 +281,8 @@ public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
         final Cluster localMetadata = new Cluster(
             "cluster",
             Collections.singletonList(Node.noNode()),
-            localInfos, Collections.<String>emptySet(),
+            localInfos,
+            Collections.<String>emptySet(),
             Collections.<String>emptySet());
 
         final List<String> topics = Utils.mkList("topic1", "topic2");
@@ -332,26 +337,26 @@ public void testAssignWithPartialTopology() {
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
         builder.addStateStore(new MockStoreBuilder("store2", false), "processor2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+        final List<String> topics = Utils.mkList("topic1", "topic2");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
-        UUID uuid1 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
 
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, (Object) SingleGroupPartitionGrouperStub.class));
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
             new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
 
         // will throw exception if it fails
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assignment info
-        AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
-        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
+        final AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
+        final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         assertEquals(3, allActiveTasks.size());
         assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -363,8 +368,8 @@ public void testAssignEmptyMetadata() {
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+        final List<String> topics = Utils.mkList("topic1", "topic2");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> standbyTasks10 = Utils.mkSet(task1);
@@ -372,12 +377,12 @@ public void testAssignEmptyMetadata() {
             Collections.<PartitionInfo>emptySet(),
             Collections.<String>emptySet(),
             Collections.<String>emptySet());
-        UUID uuid1 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
 
         mockTaskManager(prevTasks10, standbyTasks10, uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
             new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
 
@@ -390,7 +395,7 @@ public void testAssignEmptyMetadata() {
 
         // check assignment info
         AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
-        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
+        final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
 
         assertEquals(0, allActiveTasks.size());
         assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@@ -418,22 +423,22 @@ public void testAssignWithNewTasks() {
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addSource(null, "source3", null, null, null, "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3");
-        List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
+        final List<String> topics = Utils.mkList("topic1", "topic2", "topic3");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3);
 
         // assuming that previous tasks do not have topic3
         final Set<TaskId> prevTasks10 = Utils.mkSet(task0);
         final Set<TaskId> prevTasks11 = Utils.mkSet(task1);
         final Set<TaskId> prevTasks20 = Utils.mkSet(task2);
 
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
         mockTaskManager(prevTasks10, Collections.<TaskId>emptySet(), uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
@@ -441,14 +446,14 @@ public void testAssignWithNewTasks() {
         subscriptions.put("consumer20",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match
         // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and
         // then later ones will be re-assigned to other hosts due to load balancing
         AssignmentInfo info = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks());
-        Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions());
+        final Set<TaskId> allActiveTasks = new HashSet<>(info.activeTasks());
+        final Set<TopicPartition> allPartitions = new HashSet<>(assignments.get("consumer10").partitions());
 
         info = AssignmentInfo.decode(assignments.get("consumer11").userData());
         allActiveTasks.addAll(info.activeTasks());
@@ -475,18 +480,18 @@ public void testAssignWithStates() {
         builder.addStateStore(new MockStoreBuilder("store2", false), "processor-2");
         builder.addStateStore(new MockStoreBuilder("store3", false), "processor-2");
 
-        List<String> topics = Utils.mkList("topic1", "topic2");
+        final List<String> topics = Utils.mkList("topic1", "topic2");
 
-        TaskId task00 = new TaskId(0, 0);
-        TaskId task01 = new TaskId(0, 1);
-        TaskId task02 = new TaskId(0, 2);
-        TaskId task10 = new TaskId(1, 0);
-        TaskId task11 = new TaskId(1, 1);
-        TaskId task12 = new TaskId(1, 2);
-        List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
+        final TaskId task00 = new TaskId(0, 0);
+        final TaskId task01 = new TaskId(0, 1);
+        final TaskId task02 = new TaskId(0, 2);
+        final TaskId task10 = new TaskId(1, 0);
+        final TaskId task11 = new TaskId(1, 1);
+        final TaskId task12 = new TaskId(1, 2);
+        final List<TaskId> tasks = Utils.mkList(task00, task01, task02, task10, task11, task12);
 
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
 
         mockTaskManager(
             Collections.<TaskId>emptySet(),
@@ -497,7 +502,7 @@ public void testAssignWithStates() {
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
@@ -505,47 +510,46 @@ public void testAssignWithStates() {
         subscriptions.put("consumer20",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match
         assertEquals(2, assignments.get("consumer10").partitions().size());
         assertEquals(2, assignments.get("consumer11").partitions().size());
         assertEquals(2, assignments.get("consumer20").partitions().size());
 
-        AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
-        AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
-        AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
+        final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
+        final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
+        final AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
 
         assertEquals(2, info10.activeTasks().size());
         assertEquals(2, info11.activeTasks().size());
         assertEquals(2, info20.activeTasks().size());
 
-        Set<TaskId> allTasks = new HashSet<>();
+        final Set<TaskId> allTasks = new HashSet<>();
         allTasks.addAll(info10.activeTasks());
         allTasks.addAll(info11.activeTasks());
         allTasks.addAll(info20.activeTasks());
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
-        assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1", tasks, topicGroups));
-        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2", tasks, topicGroups));
-        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store3", tasks, topicGroups));
+        assertEquals(Utils.mkSet(task00, task01, task02), tasksForState("store1", tasks, topicGroups));
+        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState("store2", tasks, topicGroups));
+        assertEquals(Utils.mkSet(task10, task11, task12), tasksForState("store3", tasks, topicGroups));
     }
 
-    private Set<TaskId> tasksForState(final String applicationId,
-                                      final String storeName,
+    private Set<TaskId> tasksForState(final String storeName,
                                       final List<TaskId> tasks,
                                       final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
         final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
 
-        Set<TaskId> ids = new HashSet<>();
-        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
-            Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
+        final Set<TaskId> ids = new HashSet<>();
+        for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+            final Set<String> stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
 
             if (stateChangelogTopics.contains(changelogTopic)) {
-                for (TaskId id : tasks) {
+                for (final TaskId id : tasks) {
                     if (id.topicGroupId == entry.getKey())
                         ids.add(id);
                 }
@@ -556,15 +560,15 @@ public void testAssignWithStates() {
 
     @Test
     public void testAssignWithStandbyReplicas() {
-        Map<String, Object> props = configProps();
+        final Map<String, Object> props = configProps();
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
-        StreamsConfig streamsConfig = new StreamsConfig(props);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
 
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
-        List<String> topics = Utils.mkList("topic1", "topic2");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+        final List<String> topics = Utils.mkList("topic1", "topic2");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
 
         final Set<TaskId> prevTasks00 = Utils.mkSet(task0);
@@ -574,8 +578,8 @@ public void testAssignWithStandbyReplicas() {
         final Set<TaskId> standbyTasks02 = Utils.mkSet(task2);
         final Set<TaskId> standbyTasks00 = Utils.mkSet(task0);
 
-        UUID uuid1 = UUID.randomUUID();
-        UUID uuid2 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
+        final UUID uuid2 = UUID.randomUUID();
 
         mockTaskManager(prevTasks00, standbyTasks01, uuid1, builder);
 
@@ -583,7 +587,7 @@ public void testAssignWithStandbyReplicas() {
 
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer));
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, userEndPoint).encode()));
         subscriptions.put("consumer11",
@@ -591,15 +595,15 @@ public void testAssignWithStandbyReplicas() {
         subscriptions.put("consumer20",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
         // the first consumer
-        AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
-        Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
-        Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet());
+        final AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
+        final Set<TaskId> allActiveTasks = new HashSet<>(info10.activeTasks());
+        final Set<TaskId> allStandbyTasks = new HashSet<>(info10.standbyTasks().keySet());
 
         // the second consumer
-        AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
+        final AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
         allActiveTasks.addAll(info11.activeTasks());
         allStandbyTasks.addAll(info11.standbyTasks().keySet());
 
@@ -610,7 +614,7 @@ public void testAssignWithStandbyReplicas() {
         assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks));
 
         // the third consumer
-        AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
+        final AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
         allActiveTasks.addAll(info20.activeTasks());
         allStandbyTasks.addAll(info20.standbyTasks().keySet());
 
@@ -641,7 +645,7 @@ public void testOnAssignment() {
         final AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, hostState);
         final PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t3p0, t3p3), info.encode());
 
-        Capture<Cluster> capturedCluster = EasyMock.newCapture();
+        final Capture<Cluster> capturedCluster = EasyMock.newCapture();
         taskManager.setPartitionsByHostState(hostState);
         EasyMock.expectLastCall();
         taskManager.setAssignmentMetadata(activeTasks, standbyTasks);
@@ -667,17 +671,17 @@ public void testAssignWithInternalTopics() {
         builder.addSink("sink1", "topicX", null, null, null, "processor1");
         builder.addSource(null, "source2", null, null, null, "topicX");
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
-        List<String> topics = Utils.mkList("topic1", applicationId + "-topicX");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+        final List<String> topics = Utils.mkList("topic1", applicationId + "-topicX");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
-        UUID uuid1 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
+        final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        Set<TaskId> emptyTasks = Collections.emptySet();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
@@ -690,7 +694,7 @@ public void testAssignWithInternalTopics() {
 
     @Test
     public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
-        String applicationId = "test";
+        final String applicationId = "test";
         builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -701,18 +705,18 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
         builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
         builder.addSink("sink2", "topicZ", null, null, null, "processor2");
         builder.addSource(null, "source3", null, null, null, "topicZ");
-        List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
-        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+        final List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
-        UUID uuid1 = UUID.randomUUID();
+        final UUID uuid1 = UUID.randomUUID();
         mockTaskManager(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), uuid1, builder);
 
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
+        final MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
-        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
-        Set<TaskId> emptyTasks = Collections.emptySet();
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
@@ -731,7 +735,7 @@ public void shouldGenerateTasksForAllCreatedPartitions() {
         internalTopologyBuilder.setApplicationId(applicationId);
 
         // KStream with 3 partitions
-        KStream<Object, Object> stream1 = builder
+        final KStream<Object, Object> stream1 = builder
             .stream("topic1")
             // force creation of internal repartition topic
             .map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@@ -742,7 +746,7 @@ public void shouldGenerateTasksForAllCreatedPartitions() {
             });
 
         // KTable with 4 partitions
-        KTable<Object, Long> table1 = builder
+        final KTable<Object, Long> table1 = builder
             .table("topic3")
             // force creation of internal repartition topic
             .groupBy(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@@ -884,7 +888,7 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         try {
             configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost"));
             fail("expected to an exception due to invalid config");
-        } catch (ConfigException e) {
+        } catch (final ConfigException e) {
             // pass
         }
     }
@@ -896,7 +900,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
         try {
             configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) "localhost:j87yhk"));
             fail("expected to an exception due to invalid config");
-        } catch (ConfigException e) {
+        } catch (final ConfigException e) {
             // pass
         }
     }
@@ -908,7 +912,7 @@ public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTas
         final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
-        KStream<Object, Object> stream1 = builder
+        final KStream<Object, Object> stream1 = builder
 
             // Task 1 (should get created):
             .stream("topic1")
@@ -964,10 +968,11 @@ public Object apply(final Object value1, final Object value2) {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        mockTaskManager(Collections.<TaskId>emptySet(),
-                               Collections.<TaskId>emptySet(),
-                               UUID.randomUUID(),
-                internalTopologyBuilder);
+        mockTaskManager(
+            Collections.<TaskId>emptySet(),
+            Collections.<TaskId>emptySet(),
+            UUID.randomUUID(),
+            internalTopologyBuilder);
         configurePartitionAssignor(Collections.<String, Object>emptyMap());
 
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
@@ -1037,7 +1042,7 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
             uuid,
             internalTopologyBuilder);
 
-        Map<String, Object> props = new HashMap<>();
+        final Map<String, Object> props = new HashMap<>();
         props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint);
         configurePartitionAssignor(props);
@@ -1075,18 +1080,58 @@ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         assertThat(allAssignedPartitions, equalTo(allPartitions));
     }
 
-    @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
-        partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
+    @Test
+    public void shouldThrowKafkaExceptionIfTaskMangerNotConfigured() {
+        final Map<String, Object> config = configProps();
+        config.remove(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
+
+        try {
+            partitionAssignor.configure(config);
+            fail("Should have thrown KafkaException");
+        } catch (final KafkaException expected) {
+            assertThat(expected.getMessage(), equalTo("TaskManager is not specified"));
+        }
     }
 
-    @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() {
-        final Map<String, Object> config = new HashMap<>();
-        config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
-        config.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a stream thread");
+    @Test
+    public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance() {
+        final Map<String, Object> config = configProps();
+        config.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a task manager");
 
-        partitionAssignor.configure(config);
+        try {
+            partitionAssignor.configure(config);
+            fail("Should have thrown KafkaException");
+        } catch (final KafkaException expected) {
+            assertThat(expected.getMessage(),
+                equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.TaskManager"));
+        }
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionVersionProbingFlagNotConfigured() {
+        final Map<String, Object> config = configProps();
+        config.remove(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG);
+
+        try {
+            partitionAssignor.configure(config);
+            fail("Should have thrown KafkaException");
+        } catch (final KafkaException expected) {
+            assertThat(expected.getMessage(), equalTo("VersionProbingFlag is not specified"));
+        }
+    }
+
+    @Test
+    public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicBoolean() {
+        final Map<String, Object> config = configProps();
+        config.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, "i am not an AtomicBoolean");
+
+        try {
+            partitionAssignor.configure(config);
+            fail("Should have thrown KafkaException");
+        } catch (final KafkaException expected) {
+            assertThat(expected.getMessage(),
+                equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicBoolean"));
+        }
     }
 
     @Test
@@ -1147,7 +1192,7 @@ public void shouldDownGradeSubscriptionToVersion1() {
             builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, (Object) StreamsConfig.UPGRADE_FROM_0100));
 
-        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
 
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
     }
@@ -1187,11 +1232,114 @@ private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue
             builder);
         configurePartitionAssignor(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue));
 
-        PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
 
         assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
     }
 
+    @Test
+    public void shouldReturnUnchangedAssignmentForOldInstancesAndEmptyAssignmentForFutureInstances() {
+        builder.addSource(null, "source1", null, null, null, "topic1");
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        final Set<TaskId> activeTasks = Utils.mkSet(task0, task1);
+        final Set<TaskId> standbyTasks = Utils.mkSet(task2);
+        final Map<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<TaskId, Set<TopicPartition>>() {
+            {
+                put(task2, Collections.singleton(t1p2));
+            }
+        };
+
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(UUID.randomUUID(), activeTasks, standbyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "future-consumer",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                encodeFutureSubscription()
+            )
+        );
+
+        mockTaskManager(
+            allTasks,
+            allTasks,
+            UUID.randomUUID(),
+            builder);
+        partitionAssignor.configure(configProps());
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+        assertThat(assignment.size(), equalTo(2));
+        assertThat(
+            AssignmentInfo.decode(assignment.get("consumer1").userData()),
+            equalTo(new AssignmentInfo(
+                new ArrayList<>(activeTasks),
+                standbyTaskMap,
+                Collections.<HostInfo, Set<TopicPartition>>emptyMap()
+            )));
+        assertThat(assignment.get("consumer1").partitions(), equalTo(Utils.mkList(t1p0, t1p1)));
+
+        assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo()));
+        assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0));
+    }
+
+    @Test
+    public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() {
+        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
+    }
+
+    @Test
+    public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
+        shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
+    }
+
+    private ByteBuffer encodeFutureSubscription() {
+        final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+                                                   + 4 /* supported version */);
+        buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1);
+        buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1);
+        return buf;
+    }
+
+    private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(final int oldVersion) {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(oldVersion, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "future-consumer",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                encodeFutureSubscription()
+            )
+        );
+
+        mockTaskManager(
+            emptyTasks,
+            emptyTasks,
+            UUID.randomUUID(),
+            builder);
+        partitionAssignor.configure(configProps());
+
+        try {
+            partitionAssignor.assign(metadata, subscriptions);
+            fail("Should have thrown IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            // pass
+        }
+    }
+
     private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
         final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
                                                        Collections.<TaskId, Set<TopicPartition>>emptyMap(),
@@ -1201,19 +1349,20 @@ private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue
                 Collections.<TopicPartition>emptyList(), info.encode());
     }
 
-    private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
+    private AssignmentInfo checkAssignment(final Set<String> expectedTopics,
+                                           final PartitionAssignor.Assignment assignment) {
 
         // This assumed 1) DefaultPartitionGrouper is used, and 2) there is an only one topic group.
 
-        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+        final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
 
         // check if the number of assigned partitions == the size of active task id list
         assertEquals(assignment.partitions().size(), info.activeTasks().size());
 
         // check if active tasks are consistent
-        List<TaskId> activeTasks = new ArrayList<>();
-        Set<String> activeTopics = new HashSet<>();
-        for (TopicPartition partition : assignment.partitions()) {
+        final List<TaskId> activeTasks = new ArrayList<>();
+        final Set<String> activeTopics = new HashSet<>();
+        for (final TopicPartition partition : assignment.partitions()) {
             // since default grouper, taskid.partition == partition.partition()
             activeTasks.add(new TaskId(0, partition.partition()));
             activeTopics.add(partition.topic());
@@ -1224,11 +1373,11 @@ private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssi
         assertEquals(expectedTopics, activeTopics);
 
         // check if standby tasks are consistent
-        Set<String> standbyTopics = new HashSet<>();
-        for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) {
-            TaskId id = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
-            for (TopicPartition partition : partitions) {
+        final Set<String> standbyTopics = new HashSet<>();
+        for (final Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) {
+            final TaskId id = entry.getKey();
+            final Set<TopicPartition> partitions = entry.getValue();
+            for (final TopicPartition partition : partitions) {
                 // since default grouper, taskid.partition == partition.partition()
                 assertEquals(id.partition, partition.partition());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index e98b8ce0727..0611bfc4d5c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -75,4 +76,19 @@ public void shouldEncodeAndDecodeVersion3() {
         assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
     }
 
+    @Test
+    public void shouldAllowToDecodeFutureSupportedVersion() {
+        final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion());
+        assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.version());
+        assertEquals(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1, info.latestSupportedVersion());
+    }
+
+    private ByteBuffer encodeFutureVersion() {
+        final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */
+                                                   + 4 /* supported version */);
+        buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1);
+        buf.putInt(SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1);
+        return buf;
+    }
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index 2409bd59643..8c807800869 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -40,7 +40,7 @@ public static void main(final String[] args) throws InterruptedException, IOExce
 
         final String propFileName = args[0];
         final String command = args[1];
-        final boolean disableAutoTerminate = args.length > 3;
+        final boolean disableAutoTerminate = args.length > 2;
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
         final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 69eea0b37c0..1b01a7300a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,29 +16,57 @@
  */
 package org.apache.kafka.streams.tests;
 
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskAssignmentException;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.TaskManager;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.state.HostInfo;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 
 public class StreamsUpgradeTest {
 
     @SuppressWarnings("unchecked")
     public static void main(final String[] args) throws Exception {
-        if (args.length < 2) {
-            System.err.println("StreamsUpgradeTest requires two argument (kafka-url, properties-file) but only " + args.length + " provided: "
-                + (args.length > 0 ? args[0] : ""));
+        if (args.length < 1) {
+            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but no provided: ");
         }
-        final String kafka = args[0];
-        final String propFileName = args.length > 1 ? args[1] : null;
+        final String propFileName = args.length > 0 ? args[0] : null;
 
         final Properties streamsProperties = Utils.loadProps(propFileName);
 
         System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
-        System.out.println("kafka=" + kafka);
         System.out.println("props=" + streamsProperties);
 
         final StreamsBuilder builder = new StreamsBuilder();
@@ -48,11 +76,18 @@ public static void main(final String[] args) throws Exception {
 
         final Properties config = new Properties();
         config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
-        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        final KafkaClientSupplier kafkaClientSupplier;
+        if (streamsProperties.containsKey("test.future.metadata")) {
+            streamsProperties.remove("test.future.metadata");
+            kafkaClientSupplier = new FutureKafkaClientSupplier();
+        } else {
+            kafkaClientSupplier = new DefaultKafkaClientSupplier();
+        }
         config.putAll(streamsProperties);
 
-        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config, kafkaClientSupplier);
         streams.start();
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -66,4 +101,237 @@ public void run() {
             }
         });
     }
+
+    private static class FutureKafkaClientSupplier extends DefaultKafkaClientSupplier {
+        @Override
+        public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
+            config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, FutureStreamsPartitionAssignor.class.getName());
+            return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        }
+    }
+
+    public static class FutureStreamsPartitionAssignor extends StreamsPartitionAssignor {
+
+        public FutureStreamsPartitionAssignor() {
+            usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1;
+        }
+
+        @Override
+        public Subscription subscription(final Set<String> topics) {
+            // Adds the following information to subscription
+            // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
+            // 2. Task ids of previously running tasks
+            // 3. Task ids of valid local states on the client's state directory.
+
+            final TaskManager taskManager = taskManger();
+            final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
+            final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
+            standbyTasks.removeAll(previousActiveTasks);
+            final FutureSubscriptionInfo data = new FutureSubscriptionInfo(
+                usedSubscriptionMetadataVersion,
+                SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1,
+                taskManager.processId(),
+                previousActiveTasks,
+                standbyTasks,
+                userEndPoint());
+
+            taskManager.updateSubscriptionsFromMetadata(topics);
+
+            return new Subscription(new ArrayList<>(topics), data.encode());
+        }
+
+        @Override
+        public void onAssignment(final PartitionAssignor.Assignment assignment) {
+            try {
+                super.onAssignment(assignment);
+                return;
+            } catch (final TaskAssignmentException cannotProcessFutureVersion) {
+                // continue
+            }
+
+            final ByteBuffer data = assignment.userData();
+            data.rewind();
+
+            final int usedVersion;
+            try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
+                usedVersion = in.readInt();
+            } catch (final IOException ex) {
+                throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
+            }
+
+            if (usedVersion > AssignmentInfo.LATEST_SUPPORTED_VERSION + 1) {
+                throw new IllegalStateException("Unknown metadata version: " + usedVersion
+                    + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION + 1);
+            }
+
+            final AssignmentInfo info = AssignmentInfo.decode(
+                assignment.userData().putInt(0, AssignmentInfo.LATEST_SUPPORTED_VERSION));
+
+            final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
+            Collections.sort(partitions, PARTITION_COMPARATOR);
+
+            // version 1 field
+            final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+            // version 2 fields
+            final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
+            final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+            processLatestVersionAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+            partitionsByHost = info.partitionsByHost();
+
+            final TaskManager taskManager = taskManger();
+            taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
+            taskManager.setPartitionsByHostState(partitionsByHost);
+            taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
+            taskManager.updateSubscriptionsFromAssignment(partitions);
+        }
+
+        @Override
+        public Map<String, Assignment> assign(final Cluster metadata,
+                                              final Map<String, Subscription> subscriptions) {
+            Map<String, Assignment> assignment = null;
+
+            final Map<String, Subscription> downgradedSubscriptions = new HashMap<>();
+            for (final Subscription subscription : subscriptions.values()) {
+                final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+                if (info.version() < SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1) {
+                    assignment = super.assign(metadata, subscriptions);
+                    break;
+                }
+            }
+
+            boolean bumpUsedVersion = false;
+            final boolean bumpSupportedVersion;
+            if (assignment != null) {
+                bumpSupportedVersion = supportedVersions.size() == 1 && supportedVersions.iterator().next() == SubscriptionInfo.LATEST_SUPPORTED_VERSION + 1;
+            } else {
+                for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+                    final Subscription subscription = entry.getValue();
+
+                    final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData()
+                        .putInt(0, SubscriptionInfo.LATEST_SUPPORTED_VERSION)
+                        .putInt(4, SubscriptionInfo.LATEST_SUPPORTED_VERSION));
+
+                    downgradedSubscriptions.put(
+                        entry.getKey(),
+                        new Subscription(
+                            subscription.topics(),
+                            new SubscriptionInfo(
+                                info.processId(),
+                                info.prevTasks(),
+                                info.standbyTasks(),
+                                info.userEndPoint())
+                                .encode()));
+                }
+                assignment = super.assign(metadata, downgradedSubscriptions);
+                bumpUsedVersion = true;
+                bumpSupportedVersion = true;
+            }
+
+            final Map<String, Assignment> newAssignment = new HashMap<>();
+            for (final Map.Entry<String, Assignment> entry : assignment.entrySet()) {
+                final Assignment singleAssignment = entry.getValue();
+                newAssignment.put(
+                    entry.getKey(),
+                    new Assignment(
+                        singleAssignment.partitions(),
+                        new FutureAssignmentInfo(
+                            bumpUsedVersion,
+                            bumpSupportedVersion,
+                            singleAssignment.userData())
+                            .encode()));
+            }
+
+            return newAssignment;
+        }
+    }
+
+    private static class FutureSubscriptionInfo extends SubscriptionInfo {
+        // for testing only; don't apply version checks
+        FutureSubscriptionInfo(final int version,
+                               final int latestSupportedVersion,
+                               final UUID processId,
+                               final Set<TaskId> prevTasks,
+                               final Set<TaskId> standbyTasks,
+                               final String userEndPoint) {
+            super(version, latestSupportedVersion, processId, prevTasks, standbyTasks, userEndPoint);
+        }
+
+        public ByteBuffer encode() {
+            if (version() <= SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+                final ByteBuffer buf = super.encode();
+                // super.encode() always encodes `LATEST_SUPPORTED_VERSION` as "latest supported version"
+                // need to update to future version
+                buf.putInt(4, latestSupportedVersion());
+                return buf;
+            }
+
+            final ByteBuffer buf = encodeFutureVersion();
+            buf.rewind();
+            return buf;
+        }
+
+        private ByteBuffer encodeFutureVersion() {
+            final byte[] endPointBytes = prepareUserEndPoint();
+
+            final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes));
+
+            buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version
+            buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version
+            encodeClientUUID(buf);
+            encodeTasks(buf, prevTasks());
+            encodeTasks(buf, standbyTasks());
+            encodeUserEndPoint(buf, endPointBytes);
+
+            return buf;
+        }
+
+    }
+
+    private static class FutureAssignmentInfo extends AssignmentInfo {
+        private final boolean bumpUsedVersion;
+        private final boolean bumpSupportedVersion;
+        final ByteBuffer originalUserMetadata;
+
+        private FutureAssignmentInfo(final boolean bumpUsedVersion,
+                                     final boolean bumpSupportedVersion,
+                                     final ByteBuffer bytes) {
+            this.bumpUsedVersion = bumpUsedVersion;
+            this.bumpSupportedVersion = bumpSupportedVersion;
+            originalUserMetadata = bytes;
+        }
+
+        @Override
+        public ByteBuffer encode() {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            originalUserMetadata.rewind();
+
+            try (final DataOutputStream out = new DataOutputStream(baos)) {
+                if (bumpUsedVersion) {
+                    originalUserMetadata.getInt(); // discard original used version
+                    out.writeInt(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1);
+                } else {
+                    out.writeInt(originalUserMetadata.getInt());
+                }
+                if (bumpSupportedVersion) {
+                    originalUserMetadata.getInt(); // discard original supported version
+                    out.writeInt(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1);
+                }
+
+                try {
+                    while (true) {
+                        out.write(originalUserMetadata.get());
+                    }
+                } catch (final BufferUnderflowException expectedWhenAllDataCopied) { }
+
+                out.flush();
+                out.close();
+
+                return ByteBuffer.wrap(baos.toByteArray());
+            } catch (final IOException ex) {
+                throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
+            }
+        }
+    }
 }
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index f268ab8de59..1d8ed270cc5 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -21,7 +21,7 @@
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
 from kafkatest.services.kafka import KafkaConfig
 from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
+from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1
 
 STATE_DIR = "state.dir"
 
@@ -52,6 +52,33 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "streams_log.1": {
+            "path": LOG_FILE + ".1",
+            "collect_default": True},
+        "streams_stdout.1": {
+            "path": STDOUT_FILE + ".1",
+            "collect_default": True},
+        "streams_stderr.1": {
+            "path": STDERR_FILE + ".1",
+            "collect_default": True},
+        "streams_log.2": {
+            "path": LOG_FILE + ".2",
+            "collect_default": True},
+        "streams_stdout.2": {
+            "path": STDOUT_FILE + ".2",
+            "collect_default": True},
+        "streams_stderr.2": {
+            "path": STDERR_FILE + ".2",
+            "collect_default": True},
+        "streams_log.3": {
+            "path": LOG_FILE + ".3",
+            "collect_default": True},
+        "streams_stdout.3": {
+            "path": STDOUT_FILE + ".3",
+            "collect_default": True},
+        "streams_stderr.3": {
+            "path": STDERR_FILE + ".3",
+            "collect_default": True},
         "streams_log.0-1": {
             "path": LOG_FILE + ".0-1",
             "collect_default": True},
@@ -412,17 +439,26 @@ def set_version(self, kafka_streams_version):
     def set_upgrade_from(self, upgrade_from):
         self.UPGRADE_FROM = upgrade_from
 
+    def set_upgrade_to(self, upgrade_to):
+        self.UPGRADE_TO = upgrade_to
+
     def prop_file(self):
-        properties = {STATE_DIR: self.PERSISTENT_ROOT}
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()}
         if self.UPGRADE_FROM is not None:
             properties['upgrade.from'] = self.UPGRADE_FROM
+        if self.UPGRADE_TO == "future_version":
+            properties['test.future.metadata'] = "any_value"
 
         cfg = KafkaConfig(**properties)
         return cfg.render()
 
     def start_cmd(self, node):
         args = self.args.copy()
-        args['kafka'] = self.kafka.bootstrap_servers()
+        if self.KAFKA_STREAMS_VERSION in [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]:
+            args['kafka'] = self.kafka.bootstrap_servers()
+        else:
+            args['kafka'] = ""
         if self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_0) or self.KAFKA_STREAMS_VERSION == str(LATEST_0_10_1):
             args['zk'] = self.kafka.zk.connect_setting()
         else:
@@ -437,7 +473,7 @@ def start_cmd(self, node):
 
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
               "INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
-              " %(kafka_run_class)s %(streams_class_name)s  %(kafka)s %(zk)s %(config_file)s " \
+              " %(kafka_run_class)s %(streams_class_name)s %(kafka)s %(zk)s %(config_file)s " \
               " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
 
         self.logger.info("Executing: " + cmd)
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index debe85fd7e2..41134672e98 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -48,6 +48,7 @@ def __init__(self, test_context):
             'data' : { 'partitions': 5 },
         }
         self.leader = None
+        self.leader_counter = {}
 
     def perform_broker_upgrade(self, to_version):
         self.logger.info("First pass bounce - rolling broker upgrade")
@@ -158,7 +159,7 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):
         random.shuffle(self.processors)
         for p in self.processors:
             p.CLEAN_NODE_ENABLED = False
-            self.do_rolling_bounce(p, None, to_version, counter)
+            self.do_stop_start_bounce(p, None, to_version, counter)
             counter = counter + 1
 
         # shutdown
@@ -176,8 +177,7 @@ def test_simple_upgrade_downgrade(self, from_version, to_version):
 
         self.driver.stop()
 
-    #@matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
-    @ignore
+    @matrix(from_version=metadata_1_versions, to_version=backward_compatible_metadata_2_versions)
     @matrix(from_version=metadata_1_versions, to_version=metadata_3_versions)
     @matrix(from_version=metadata_2_versions, to_version=metadata_3_versions)
     def test_metadata_upgrade(self, from_version, to_version):
@@ -209,13 +209,70 @@ def test_metadata_upgrade(self, from_version, to_version):
         random.shuffle(self.processors)
         for p in self.processors:
             p.CLEAN_NODE_ENABLED = False
-            self.do_rolling_bounce(p, from_version[:-2], to_version, counter)
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, counter)
             counter = counter + 1
 
         # second rolling bounce
         random.shuffle(self.processors)
         for p in self.processors:
-            self.do_rolling_bounce(p, None, to_version, counter)
+            self.do_stop_start_bounce(p, None, to_version, counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    def test_version_probing_upgrade(self):
+        """
+        Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
+        """
+
+        self.zk = ZookeeperService(self.test_context, num_nodes=1)
+        self.zk.start()
+
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
+        self.kafka.start()
+
+        self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(self.test_context, self.kafka)
+
+        self.driver.start()
+        self.start_all_nodes_with("") # run with TRUNK
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+        self.old_processors = [self.processor1, self.processor2, self.processor3]
+        self.upgraded_processors = []
+        for p in self.processors:
+            self.leader_counter[p] = 2
+
+        self.update_leader()
+        for p in self.processors:
+            self.leader_counter[p] = 0
+        self.leader_counter[self.leader] = 3
+
+        counter = 1
+        current_generation = 3
+
+        random.seed()
+        random.shuffle(self.processors)
+
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            current_generation = self.do_rolling_bounce(p, counter, current_generation)
             counter = counter + 1
 
         # shutdown
@@ -233,6 +290,27 @@ def test_metadata_upgrade(self, from_version, to_version):
 
         self.driver.stop()
 
+    def update_leader(self):
+        self.leader = None
+        retries = 10
+        while retries > 0:
+            for p in self.processors:
+                found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
+                if len(found) == self.leader_counter[p] + 1:
+                    if self.leader is not None:
+                        raise Exception("Could not uniquely identify leader")
+                    self.leader = p
+                    self.leader_counter[p] = self.leader_counter[p] + 1
+
+            if self.leader is None:
+                retries = retries - 1
+                time.sleep(5)
+            else:
+                break
+
+        if self.leader is None:
+            raise Exception("Could not identify leader")
+
     def start_all_nodes_with(self, version):
         # start first with <version>
         self.prepare_for(self.processor1, version)
@@ -293,7 +371,7 @@ def prepare_for(processor, version):
         else:
             processor.set_version(version)
 
-    def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
+    def do_stop_start_bounce(self, processor, upgrade_from, new_version, counter):
         first_other_processor = None
         second_other_processor = None
         for p in self.processors:
@@ -361,3 +439,120 @@ def do_rolling_bounce(self, processor, upgrade_from, new_version, counter):
                         monitor.wait_until("processed 100 records from topic",
                                            timeout_sec=60,
                                            err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
+
+    def do_rolling_bounce(self, processor, counter, current_generation):
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
+
+        with first_other_node.account.monitor_log(first_other_processor.LOG_FILE) as first_other_monitor:
+            with second_other_node.account.monitor_log(second_other_processor.LOG_FILE) as second_other_monitor:
+                # stop processor
+                processor.stop()
+                node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+                node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + "." + str(counter), allow_fail=False)
+                node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + "." + str(counter), allow_fail=False)
+                node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + "." + str(counter), allow_fail=False)
+                self.leader_counter[processor] = 0
+
+                with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                    processor.set_upgrade_to("future_version")
+                    processor.start()
+                    self.old_processors.remove(processor)
+                    self.upgraded_processors.append(processor)
+
+                    current_generation = current_generation + 1
+
+                    log_monitor.wait_until("Kafka version : " + str(DEV_VERSION),
+                                           timeout_sec=60,
+                                           err_msg="Could not detect Kafka Streams version " + str(DEV_VERSION) + " in " + str(node.account))
+                    log_monitor.offset = 5
+                    log_monitor.wait_until("partition\.assignment\.strategy = \[org\.apache\.kafka\.streams\.tests\.StreamsUpgradeTest$FutureStreamsPartitionAssignor\]",
+                                           timeout_sec=60,
+                                           err_msg="Could not detect FutureStreamsPartitionAssignor in " + str(node.account))
+
+                    log_monitor.wait_until("Successfully joined group with generation " + str(current_generation),
+                                           timeout_sec=60,
+                                           err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(node.account))
+                    first_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation),
+                                                   timeout_sec=60,
+                                                   err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(first_other_node.account))
+                    second_other_monitor.wait_until("Successfully joined group with generation " + str(current_generation),
+                                                    timeout_sec=60,
+                                                    err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(second_other_node.account))
+
+                    if processor == self.leader:
+                        self.update_leader()
+                    else:
+                        self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1
+
+                    if processor == self.leader:
+                        leader_monitor = log_monitor
+                    elif first_other_processor == self.leader:
+                        leader_monitor = first_other_monitor
+                    elif second_other_processor == self.leader:
+                        leader_monitor = second_other_monitor
+                    else:
+                        raise Exception("Could not identify leader.")
+
+                    monitors = {}
+                    monitors[processor] = log_monitor
+                    monitors[first_other_processor] = first_other_monitor
+                    monitors[second_other_processor] = second_other_monitor
+
+                    leader_monitor.wait_until("Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).",
+                                              timeout_sec=60,
+                                              err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account))
+
+                    if len(self.old_processors) > 0:
+                        log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.",
+                                               timeout_sec=60,
+                                               err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
+                    else:
+                        log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Setting subscription metadata to leaders supported version 4 and trigger new rebalance.",
+                                               timeout_sec=60,
+                                               err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
+                        first_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.",
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account))
+                        second_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.",
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account))
+
+                    log_monitor.wait_until("Version probing detected. Triggering new rebalance.",
+                                           timeout_sec=60,
+                                           err_msg="Could not detect 'Triggering new rebalance' at upgrading node " + str(node.account))
+
+                    # version probing should trigger second rebalance
+                    current_generation = current_generation + 1
+
+                    for p in self.processors:
+                        monitors[p].wait_until("Successfully joined group with generation " + str(current_generation),
+                                               timeout_sec=60,
+                                               err_msg="Never saw output 'Successfully joined group with generation " + str(current_generation) + "' on" + str(p.node.account))
+
+                    if processor == self.leader:
+                        self.update_leader()
+                    else:
+                        self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1
+
+                    if self.leader in self.old_processors or len(self.old_processors) > 0:
+                        self.verify_metadata_no_upgraded_yet()
+
+        return current_generation
+
+    def verify_metadata_no_upgraded_yet(self):
+        for p in self.processors:
+            found = list(p.node.account.ssh_capture("grep \"Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.\" " + p.LOG_FILE, allow_fail=True))
+            if len(found) > 0:
+                raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'")
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 7823efac1d4..0ed29a34968 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -61,7 +61,7 @@ def get_version(node=None):
         return DEV_BRANCH
 
 DEV_BRANCH = KafkaVersion("dev")
-DEV_VERSION = KafkaVersion("1.2.0-SNAPSHOT")
+DEV_VERSION = KafkaVersion("2.0.0-SNAPSHOT")
 
 # 0.8.2.x versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: kip
>             Fix For: 2.0.0
>
>
> KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling upgrade of the app, so that one point, there were both 0.10.0.0-based instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
>         at org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
>         at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>         
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, because the internal version number of the protocol changed when adding Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)