You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/18 22:54:07 UTC

[1/2] kafka git commit: KAFKA-6122: Global Consumer should handle TimeoutException

Repository: kafka
Updated Branches:
  refs/heads/trunk b604540fb -> 40fd9fa98


http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 897a2c5..0166480 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -162,8 +162,8 @@ public class StreamTaskTest {
         source2.addChild(processorSystemTime);
         config = createConfig(false);
         eosConfig = createConfig(true);
-        stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+        stateDirectory = new StateDirectory(config, new MockTime());
+        task = new StreamTask(taskId00, partitions, topology, consumer,
                               changelogReader, config, streamsMetrics, stateDirectory, null, time, producer);
         task.initialize();
     }
@@ -456,7 +456,7 @@ public class StreamTaskTest {
 
         task.close(true, false);
 
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader, config,
             streamsMetrics, stateDirectory, null, time, producer);
         task.initialize();
         final int offset = 20;
@@ -548,7 +548,7 @@ public class StreamTaskTest {
     public void shouldFlushRecordCollectorOnFlushState() {
         final AtomicBoolean flushed = new AtomicBoolean(false);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
             changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
@@ -602,7 +602,7 @@ public class StreamTaskTest {
         restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
 
         final long offset = 543L;
-        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
             changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
@@ -669,7 +669,7 @@ public class StreamTaskTest {
         restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
 
         final long offset = 543L;
-        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
             changelogReader, testConfig, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
@@ -758,7 +758,7 @@ public class StreamTaskTest {
     @Test
     public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         assertTrue(producer.transactionInitialized());
@@ -768,7 +768,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
         assertFalse(producer.transactionInitialized());
@@ -778,7 +778,7 @@ public class StreamTaskTest {
     @Test
     public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.addRecords(partition1, Collections.singletonList(
@@ -794,7 +794,7 @@ public class StreamTaskTest {
     @Test
     public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.suspend();
@@ -805,7 +805,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
         task.addRecords(partition1, Collections.singletonList(
@@ -821,7 +821,7 @@ public class StreamTaskTest {
     @Test
     public void shouldStartNewTransactionOnResumeIfEosEnabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.addRecords(partition1, Collections.singletonList(
@@ -836,7 +836,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
         task.addRecords(partition1, Collections.singletonList(
@@ -851,7 +851,7 @@ public class StreamTaskTest {
     @Test
     public void shouldStartNewTransactionOnCommitIfEosEnabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.addRecords(partition1, Collections.singletonList(
@@ -865,7 +865,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
         task.addRecords(partition1, Collections.singletonList(
@@ -879,7 +879,7 @@ public class StreamTaskTest {
     @Test
     public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.close(false, false);
@@ -890,7 +890,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() throws Exception {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.close(false, true);
@@ -901,7 +901,7 @@ public class StreamTaskTest {
     @Test
     public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
         final MockProducer producer = new MockProducer();
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+        task = new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
 
         task.close(false, false);
@@ -913,7 +913,7 @@ public class StreamTaskTest {
     public void shouldCloseProducerOnCloseWhenEosEnabled() {
         final MockProducer producer = new MockProducer();
 
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+        task = new StreamTask(taskId00, partitions, topology, consumer,
             changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.close(true, false);
@@ -929,7 +929,7 @@ public class StreamTaskTest {
         EasyMock.expect(consumer.committed(EasyMock.anyObject(TopicPartition.class)))
                 .andStubReturn(new OffsetAndMetadata(1L));
         EasyMock.replay(consumer);
-        final StreamTask task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+        final StreamTask task = new StreamTask(taskId00, partitions, topology, consumer,
                               changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer) {
 
             @Override
@@ -968,7 +968,7 @@ public class StreamTaskTest {
                                                                  Collections.<StateStore>emptyList(),
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
-        final StreamTask task = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1), topology, consumer,
+        final StreamTask task = new StreamTask(taskId00, Utils.mkSet(partition1), topology, consumer,
                                                changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
         task.initialize();
@@ -984,7 +984,7 @@ public class StreamTaskTest {
     @Test
     public void shouldCloseStateManagerIfFailureOnTaskClose() {
         final AtomicBoolean stateManagerCloseCalled = new AtomicBoolean(false);
-        final StreamTask streamTask = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
                                                changelogReader, eosConfig, streamsMetrics, stateDirectory, null,
                                                      time, new MockProducer<byte[], byte[]>()) {
 
@@ -1031,7 +1031,6 @@ public class StreamTaskTest {
 
 
         final StreamTask task = new StreamTask(taskId00,
-                                               applicationId,
                                                Utils.mkSet(partition1),
                                                topology,
                                                consumer,
@@ -1059,7 +1058,6 @@ public class StreamTaskTest {
 
 
         final StreamTask task = new StreamTask(taskId00,
-                                               applicationId,
                                                Utils.mkSet(partition1),
                                                topology,
                                                consumer,
@@ -1096,7 +1094,7 @@ public class StreamTaskTest {
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        return new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
+        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader, config,
             streamsMetrics, stateDirectory, null, time, producer);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 7d04040..03860ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -87,7 +87,7 @@ public class StreamThreadTest {
     private InternalTopologyBuilder internalTopologyBuilder;
     private final StreamsConfig config = new StreamsConfig(configProps(false));
     private final String stateDir = TestUtils.tempDirectory().getPath();
-    private final StateDirectory stateDirectory  = new StateDirectory("applicationId", stateDir, mockTime);
+    private final StateDirectory stateDirectory  = new StateDirectory(config, mockTime);
     private StreamsMetadataState streamsMetadataState;
     private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9271ca6..9821e4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -90,9 +90,7 @@ public class StreamThreadStateStoreProviderTest {
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         stateDir = TestUtils.tempDirectory();
-        final String stateConfigDir = stateDir.getPath();
-        properties.put(StreamsConfig.STATE_DIR_CONFIG,
-                stateConfigDir);
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
 
         final StreamsConfig streamsConfig = new StreamsConfig(properties);
         final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -102,7 +100,7 @@ public class StreamThreadStateStoreProviderTest {
         builder.setApplicationId(applicationId);
         final ProcessorTopology topology = builder.build(null);
         tasks = new HashMap<>();
-        stateDirectory = new StateDirectory(applicationId, stateConfigDir, new MockTime());
+        stateDirectory = new StateDirectory(streamsConfig, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
         taskOne.initialize();
@@ -183,7 +181,6 @@ public class StreamThreadStateStoreProviderTest {
                                          final TaskId taskId) {
         return new StreamTask(
             taskId,
-            applicationId,
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 70568bf..ce6cca8 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -100,11 +100,10 @@ public class MockProcessorContext extends AbstractProcessorContext implements Re
                                 final RecordCollector.Supplier collectorSupplier,
                                 final ThreadCache cache) {
         super(new TaskId(0, 0),
-                config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
-                config,
-                new MockStreamsMetrics(metrics),
-                null,
-                cache);
+              config,
+              new MockStreamsMetrics(metrics),
+              null,
+              cache);
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index bc56866..afa0639 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -36,7 +36,7 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     public Map forwardedValues = new HashMap();
 
     public NoOpProcessorContext() {
-        super(new TaskId(1, 1), "appId", streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null);
+        super(new TaskId(1, 1), streamsConfig(), new MockStreamsMetrics(new Metrics()), null, null);
     }
 
     static StreamsConfig streamsConfig() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 1d91b52..887b10d 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -205,7 +205,7 @@ public class ProcessorTopologyTestDriver {
 
         consumer.assign(offsetsByTopicPartition.keySet());
 
-        final StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
+        final StateDirectory stateDirectory = new StateDirectory(config, Time.SYSTEM);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics);
 
@@ -221,10 +221,12 @@ public class ProcessorTopologyTestDriver {
                 globalPartitionsByTopic.put(topicName, partition);
                 offsetsByTopicPartition.put(partition, new AtomicLong());
             }
-            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology,
+            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(new LogContext("mock "),
+                                                                                   globalTopology,
                                                                                    globalConsumer,
                                                                                    stateDirectory,
-                                                                                   stateRestoreListener);
+                                                                                   stateRestoreListener,
+                                                                                   config);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache),
                                                         stateManager,
@@ -235,7 +237,6 @@ public class ProcessorTopologyTestDriver {
 
         if (!partitionsByTopic.isEmpty()) {
             task = new StreamTask(TASK_ID,
-                                  APPLICATION_ID,
                                   partitionsByTopic.values(),
                                   topology,
                                   consumer,


[2/2] kafka git commit: KAFKA-6122: Global Consumer should handle TimeoutException

Posted by gu...@apache.org.
KAFKA-6122: Global Consumer should handle TimeoutException

Implements KIP-224:
- adding new StreamsConfig `retires`
- uses `retires` and `retry.backoff.ms` to handle TimeoutException in GlobalStateManager
- adds two new tests to trigger TimeoutException in global consumer
- some minor code cleanup to reduce number of parameters we need to pass around

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #4206 from mjsax/kafka-6122-global-consumer-timeout-exception


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/40fd9fa9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/40fd9fa9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/40fd9fa9

Branch: refs/heads/trunk
Commit: 40fd9fa98105111bca373e76f8f39914cade709b
Parents: b604540
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Sat Nov 18 14:54:01 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Nov 18 14:54:01 2017 -0800

----------------------------------------------------------------------
 .../kafka/clients/CommonClientConfigs.java      |   3 +
 .../kafka/clients/admin/AdminClientConfig.java  |   5 +-
 .../kafka/clients/producer/ProducerConfig.java  |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java  |   3 +-
 .../org/apache/kafka/streams/StreamsConfig.java |  10 ++
 .../internals/AbstractProcessorContext.java     |   3 +-
 .../processor/internals/AbstractTask.java       |   5 +-
 .../internals/GlobalProcessorContextImpl.java   |   2 +-
 .../internals/GlobalStateManagerImpl.java       |  79 +++++++++++--
 .../processor/internals/GlobalStreamThread.java |  20 +++-
 .../internals/ProcessorContextImpl.java         |   2 +-
 .../processor/internals/StandbyContextImpl.java |  10 +-
 .../processor/internals/StandbyTask.java        |   6 +-
 .../processor/internals/StateDirectory.java     |  41 +++----
 .../streams/processor/internals/StreamTask.java |   3 +-
 .../processor/internals/StreamThread.java       |   2 -
 .../apache/kafka/streams/KafkaStreamsTest.java  |  10 +-
 .../org/apache/kafka/streams/TopologyTest.java  |   2 +
 .../streams/processor/TopologyBuilderTest.java  |   2 +
 .../internals/AbstractProcessorContextTest.java |   2 +-
 .../processor/internals/AbstractTaskTest.java   |   5 +-
 .../internals/GlobalStateManagerImplTest.java   | 114 ++++++++++++++++---
 .../internals/GlobalStreamThreadTest.java       |   5 +-
 .../internals/InternalTopologyBuilderTest.java  |   2 +
 .../internals/ProcessorStateManagerTest.java    |  43 ++++---
 .../processor/internals/StandbyTaskTest.java    |  16 ++-
 .../processor/internals/StateDirectoryTest.java |  54 ++++++---
 .../processor/internals/StreamTaskTest.java     |  48 ++++----
 .../processor/internals/StreamThreadTest.java   |   2 +-
 .../StreamThreadStateStoreProviderTest.java     |   7 +-
 .../apache/kafka/test/MockProcessorContext.java |   9 +-
 .../apache/kafka/test/NoOpProcessorContext.java |   2 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   9 +-
 33 files changed, 350 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 7b9e0f8..7da7a60 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -59,6 +59,9 @@ public class CommonClientConfigs {
     public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
     public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.";
 
+    public static final String RETRIES_CONFIG = "retries";
+    public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.";
+
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index f0a117c..b5ca15a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -98,8 +98,7 @@ public class AdminClientConfig extends AbstractConfig {
     private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
     private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
 
-    public static final String RETRIES_CONFIG = "retries";
-    private static final String RETRIES_DOC = "The maximum number of times to retry a call before failing it.";
+    public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -144,7 +143,7 @@ public class AdminClientConfig extends AbstractConfig {
                                         5,
                                         atLeast(0),
                                         Importance.LOW,
-                                        RETRIES_DOC)
+                                        CommonClientConfigs.RETRIES_DOC)
                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
                                         Type.LONG,
                                         30000,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 228176a..3847414 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -165,7 +165,7 @@ public class ProducerConfig extends AbstractConfig {
                                                                             + " message re-ordering due to retries (i.e., if retries are enabled).";
 
     /** <code>retries</code> */
-    public static final String RETRIES_CONFIG = "retries";
+    public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
     private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
                                               + " Note that this retry is no different than if the client resent the record upon receiving the error."
                                               + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 9ad02ea..c551d01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -611,8 +611,7 @@ public class KafkaStreams {
         threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
         try {
             stateDirectory = new StateDirectory(
-                applicationId,
-                config.getString(StreamsConfig.STATE_DIR_CONFIG),
+                config,
                 Time.SYSTEM);
         } catch (final ProcessorStateException fatal) {
             throw new StreamsException(fatal);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 8fc37f2..a1a0d10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -46,6 +46,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 
@@ -250,6 +251,9 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code request.timeout.ms} */
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
 
+    /** {@code retries} */
+    public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
+
     /** {@code retry.backoff.ms} */
     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
 
@@ -462,6 +466,12 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0L),
                     ConfigDef.Importance.LOW,
                     CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
+            .define(RETRIES_CONFIG,
+                    Type.INT,
+                    0,
+                    between(0, Integer.MAX_VALUE),
+                    ConfigDef.Importance.LOW,
+                    CommonClientConfigs.RETRIES_DOC)
             .define(RETRY_BACKOFF_MS_CONFIG,
                     Type.LONG,
                     100L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 410212e..aa58226 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -46,13 +46,12 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     final StateManager stateManager;
 
     public AbstractProcessorContext(final TaskId taskId,
-                                    final String applicationId,
                                     final StreamsConfig config,
                                     final StreamsMetrics metrics,
                                     final StateManager stateManager,
                                     final ThreadCache cache) {
         this.taskId = taskId;
-        this.applicationId = applicationId;
+        this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         this.config = config;
         this.metrics = metrics;
         this.stateManager = stateManager;

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b0ae23c..d11af3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -60,7 +60,6 @@ public abstract class AbstractTask implements Task {
      * @throws ProcessorStateException if the state manager cannot be created
      */
     AbstractTask(final TaskId id,
-                 final String applicationId,
                  final Collection<TopicPartition> partitions,
                  final ProcessorTopology topology,
                  final Consumer<byte[], byte[]> consumer,
@@ -69,7 +68,7 @@ public abstract class AbstractTask implements Task {
                  final StateDirectory stateDirectory,
                  final StreamsConfig config) {
         this.id = id;
-        this.applicationId = applicationId;
+        this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
@@ -206,7 +205,7 @@ public abstract class AbstractTask implements Task {
         }
 
         try {
-            if (!stateDirectory.lock(id, 5)) {
+            if (!stateDirectory.lock(id)) {
                 throw new LockException(String.format("%sFailed to lock the state directory for task %s",
                                                       logPrefix, id));
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 7925b14..fbb4cb6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -34,7 +34,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
                                       final StateManager stateMgr,
                                       final StreamsMetrics metrics,
                                       final ThreadCache cache) {
-        super(new TaskId(-1, -1), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), config, metrics, stateMgr, cache);
+        super(new TaskId(-1, -1), config, metrics, stateMgr, cache);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 07276ba..bbae9aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -21,7 +21,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -31,7 +35,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,8 +54,7 @@ import static org.apache.kafka.streams.processor.internals.ProcessorStateManager
  * of Global State Stores. There is only ever 1 instance of this class per Application Instance.
  */
 public class GlobalStateManagerImpl implements GlobalStateManager {
-    private static final int MAX_LOCK_ATTEMPTS = 5;
-    private static final Logger log = LoggerFactory.getLogger(GlobalStateManagerImpl.class);
+    private final Logger log;
 
     private final ProcessorTopology topology;
     private final Consumer<byte[], byte[]> globalConsumer;
@@ -63,23 +65,30 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final Set<String> globalStoreNames = new HashSet<>();
     private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
     private final StateRestoreListener stateRestoreListener;
+    private final int retries;
+    private final long retryBackoffMs;
 
-    public GlobalStateManagerImpl(final ProcessorTopology topology,
+    public GlobalStateManagerImpl(final LogContext logContext,
+                                  final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> globalConsumer,
                                   final StateDirectory stateDirectory,
-                                  final StateRestoreListener stateRestoreListener) {
+                                  final StateRestoreListener stateRestoreListener,
+                                  final StreamsConfig config) {
+        this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
         this.stateDirectory = stateDirectory;
         this.baseDir = stateDirectory.globalStateDir();
         this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
         this.stateRestoreListener = stateRestoreListener;
+        this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
+        this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
     }
 
     @Override
     public Set<String> initialize(final InternalProcessorContext processorContext) {
         try {
-            if (!stateDirectory.lockGlobalState(MAX_LOCK_ATTEMPTS)) {
+            if (!stateDirectory.lockGlobalState()) {
                 throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
             }
         } catch (IOException e) {
@@ -136,7 +145,32 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
         log.info("Restoring state for global store {}", store.name());
         final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
-        final Map<TopicPartition, Long> highWatermarks = globalConsumer.endOffsets(topicPartitions);
+        Map<TopicPartition, Long> highWatermarks = null;
+
+        int attempts = 0;
+        while (highWatermarks == null) {
+            try {
+                highWatermarks = globalConsumer.endOffsets(topicPartitions);
+            } catch (final TimeoutException retryableException) {
+                if (++attempts > retries) {
+                    log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " +
+                        "You can increase the number of retries via configuration parameter `retries`.",
+                        store.name(),
+                        retries,
+                        retryableException);
+                    throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " +
+                            "You can increase the number of retries via configuration parameter `retries`.", store.name(), retries),
+                        retryableException);
+                }
+                log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})",
+                    topicPartitions,
+                    retryBackoffMs,
+                    attempts,
+                    retries,
+                    retryableException);
+                Utils.sleep(retryBackoffMs);
+            }
+        }
         try {
             restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name());
             stores.put(store.name(), store);
@@ -148,7 +182,36 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
     private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
         final String sourceTopic = topology.storeToChangelogTopic().get(store.name());
-        final List<PartitionInfo> partitionInfos = globalConsumer.partitionsFor(sourceTopic);
+        List<PartitionInfo> partitionInfos;
+        int attempts = 0;
+        while (true) {
+            try {
+                partitionInfos = globalConsumer.partitionsFor(sourceTopic);
+                break;
+            } catch (final TimeoutException retryableException) {
+                if (++attempts > retries) {
+                    log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " +
+                            "The broker may be transiently unavailable at the moment. " +
+                            "You can increase the number of retries via configuration parameter `retries`.",
+                        sourceTopic,
+                        retries,
+                        retryableException);
+                    throw new StreamsException(String.format("Failed to get partitions for topic %s after %d retry attempts due to timeout. " +
+                        "The broker may be transiently unavailable at the moment. " +
+                        "You can increase the number of retries via configuration parameter `retries`.", sourceTopic, retries),
+                        retryableException);
+                }
+                log.debug("Failed to get partitions for topic {} due to timeout. The broker may be transiently unavailable at the moment. " +
+                        "Backing off for {} ms to retry (attempt {} of {})",
+                    sourceTopic,
+                    retryBackoffMs,
+                    attempts,
+                    retries,
+                    retryableException);
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+
         if (partitionInfos == null || partitionInfos.isEmpty()) {
             throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index f3800fb..24cec25 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -302,10 +303,12 @@ public class GlobalStreamThread extends Thread {
 
     private StateConsumer initialize() {
         try {
-            final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology,
+            final GlobalStateManager stateMgr = new GlobalStateManagerImpl(logContext,
+                                                                           topology,
                                                                            globalConsumer,
                                                                            stateDirectory,
-                                                                           stateRestoreListener);
+                                                                           stateRestoreListener,
+                                                                           config);
             final StateConsumer stateConsumer
                     = new StateConsumer(this.logContext,
                                         globalConsumer,
@@ -323,10 +326,15 @@ public class GlobalStreamThread extends Thread {
                                         config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
             stateConsumer.initialize();
             return stateConsumer;
-        } catch (final StreamsException e) {
-            startupException = e;
-        } catch (final Exception e) {
-            startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e);
+        } catch (final LockException fatalException) {
+            final String errorMsg = "Could not lock global state directory. This could happen if multiple KafkaStreams " +
+                "instances are running on the same host using the same state directory.";
+            log.error(errorMsg, fatalException);
+            startupException = new StreamsException(errorMsg, fatalException);
+        } catch (final StreamsException fatalException) {
+            startupException = fatalException;
+        } catch (final Exception fatalException) {
+            startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index d9d7d27..5010cf1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -39,7 +39,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                          final ProcessorStateManager stateMgr,
                          final StreamsMetrics metrics,
                          final ThreadCache cache) {
-        super(id, task.applicationId(), config, metrics, stateMgr, cache);
+        super(id, config, metrics, stateMgr, cache);
         this.task = task;
         this.collector = collector;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index a9a03ae..e38b821 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -67,11 +67,17 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     };
 
     StandbyContextImpl(final TaskId id,
-                       final String applicationId,
                        final StreamsConfig config,
                        final ProcessorStateManager stateMgr,
                        final StreamsMetrics metrics) {
-        super(id, applicationId, config, metrics, stateMgr, new ThreadCache(new LogContext("zeroCache "), 0, metrics));
+        super(id,
+              config,
+              metrics,
+              stateMgr,
+              new ThreadCache(
+                  new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
+                  0,
+                  metrics));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index fbbb357..73fbf63 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -40,7 +40,6 @@ public class StandbyTask extends AbstractTask {
      * Create {@link StandbyTask} with its assigned partitions
      *
      * @param id             the ID of this task
-     * @param applicationId  the ID of the stream processing application
      * @param partitions     the collection of assigned {@link TopicPartition}
      * @param topology       the instance of {@link ProcessorTopology}
      * @param consumer       the instance of {@link Consumer}
@@ -49,7 +48,6 @@ public class StandbyTask extends AbstractTask {
      * @param stateDirectory the {@link StateDirectory} created by the thread
      */
     StandbyTask(final TaskId id,
-                final String applicationId,
                 final Collection<TopicPartition> partitions,
                 final ProcessorTopology topology,
                 final Consumer<byte[], byte[]> consumer,
@@ -57,10 +55,10 @@ public class StandbyTask extends AbstractTask {
                 final StreamsConfig config,
                 final StreamsMetrics metrics,
                 final StateDirectory stateDirectory) {
-        super(id, applicationId, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
+        super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
 
         // initialize the topology with its own context
-        processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
+        processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index b7bc45c..1bfe98c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
@@ -68,14 +69,16 @@ public class StateDirectory {
      * @throws ProcessorStateException if the base state directory or application state directory does not exist
      *                                 and could not be created
      */
-    public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
+    public StateDirectory(final StreamsConfig config,
+                          final Time time) {
         this.time = time;
-        final File baseDir = new File(stateDirConfig);
+        final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
+        final File baseDir = new File(stateDirName);
         if (!baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(
-                String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirConfig));
+                String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
         }
-        stateDir = new File(baseDir, applicationId);
+        stateDir = new File(baseDir, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
         if (!stateDir.exists() && !stateDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
@@ -117,11 +120,10 @@ public class StateDirectory {
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId
-     * @param retry
      * @return true if successful
      * @throws IOException
      */
-    synchronized boolean lock(final TaskId taskId, int retry) throws IOException {
+    synchronized boolean lock(final TaskId taskId) throws IOException {
 
         final File lockFile;
         // we already have the lock so bail out here
@@ -153,7 +155,7 @@ public class StateDirectory {
             return false;
         }
 
-        final FileLock lock = tryLock(retry, channel);
+        final FileLock lock = tryLock(channel);
         if (lock != null) {
             locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
 
@@ -162,7 +164,7 @@ public class StateDirectory {
         return lock != null;
     }
 
-    synchronized boolean lockGlobalState(final int retry) throws IOException {
+    synchronized boolean lockGlobalState() throws IOException {
         if (globalStateLock != null) {
             log.trace("{} Found cached state dir lock for the global task", logPrefix());
             return true;
@@ -178,7 +180,7 @@ public class StateDirectory {
             // file, in this case we will return immediately indicating locking failed.
             return false;
         }
-        final FileLock fileLock = tryLock(retry, channel);
+        final FileLock fileLock = tryLock(channel);
         if (fileLock == null) {
             channel.close();
             return false;
@@ -237,7 +239,7 @@ public class StateDirectory {
             TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
                 try {
-                    if (lock(id, 0)) {
+                    if (lock(id)) {
                         long now = time.milliseconds();
                         long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
@@ -275,28 +277,15 @@ public class StateDirectory {
         });
     }
 
-    private FileLock tryLock(int retry, final FileChannel channel) throws IOException {
-        FileLock lock = tryAcquireLock(channel);
-        while (lock == null && retry > 0) {
-            try {
-                Thread.sleep(200);
-            } catch (Exception ex) {
-                // do nothing
-            }
-            retry--;
-            lock = tryAcquireLock(channel);
-        }
-        return lock;
-    }
-
-    private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException {
+    private FileChannel getOrCreateFileChannel(final TaskId taskId,
+                                               final Path lockPath) throws IOException {
         if (!channels.containsKey(taskId)) {
             channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
         }
         return channels.get(taskId);
     }
 
-    private FileLock tryAcquireLock(final FileChannel channel) throws IOException {
+    private FileLock tryLock(final FileChannel channel) throws IOException {
         try {
             return channel.tryLock();
         } catch (OverlappingFileLockException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4b78e27..36d5517 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -102,7 +102,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public StreamTask(final TaskId id,
-                      final String applicationId,
                       final Collection<TopicPartition> partitions,
                       final ProcessorTopology topology,
                       final Consumer<byte[], byte[]> consumer,
@@ -113,7 +112,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                       final ThreadCache cache,
                       final Time time,
                       final Producer<byte[], byte[]> producer) {
-        super(id, applicationId, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
+        super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
         streamTimePunctuationQueue = new PunctuationQueue();
         systemTimePunctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1982afb..2ad3177 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -403,7 +403,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
             return new StreamTask(
                     taskId,
-                    applicationId,
                     partitions,
                     builder.build(taskId.topicGroupId),
                     consumer,
@@ -470,7 +469,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
 
             if (!topology.stateStores().isEmpty()) {
                 return new StandbyTask(taskId,
-                                       applicationId,
                                        partitions,
                                        topology,
                                        consumer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index dd3b9af..73e7dc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -115,11 +115,9 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateThreadClose() throws Exception {
-        final int numThreads = 2;
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
 
@@ -128,7 +126,7 @@ public class KafkaStreamsTest {
         threadsField.setAccessible(true);
         final StreamThread[] threads = (StreamThread[]) threadsField.get(streams);
 
-        assertEquals(numThreads, threads.length);
+        assertEquals(NUM_THREADS, threads.length);
         assertEquals(streams.state(), KafkaStreams.State.CREATED);
 
         streams.start();
@@ -139,7 +137,7 @@ public class KafkaStreamsTest {
             }
         }, 10 * 1000, "Streams never started.");
 
-        for (int i = 0; i < numThreads; i++) {
+        for (int i = 0; i < NUM_THREADS; i++) {
             final StreamThread tmpThread = threads[i];
             tmpThread.shutdown();
             TestUtils.waitForCondition(new TestCondition() {
@@ -172,11 +170,9 @@ public class KafkaStreamsTest {
 
     @Test
     public void testStateGlobalThreadClose() throws Exception {
-        final int numThreads = 2;
         final StreamsBuilder builder = new StreamsBuilder();
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
 
@@ -446,6 +442,7 @@ public class KafkaStreamsTest {
         }
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void testToString() {
         streams.start();
@@ -462,7 +459,6 @@ public class KafkaStreamsTest {
     public void shouldCleanupOldStateDirs() throws InterruptedException {
         props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
 
-
         final String topic = "topic";
         CLUSTER.createTopic(topic);
         final StreamsBuilder builder = new StreamsBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index dbdc854..39b1443 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Test;
 
@@ -260,6 +261,7 @@ public class TopologyTest {
         final Properties config = new Properties();
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
         mockStoreBuilder();
         EasyMock.expect(storeBuilder.build()).andReturn(new MockStateStoreSupplier.MockStateStore("store", false));

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 7786348..42e5ccf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -619,6 +620,7 @@ public class TopologyBuilderTest {
         final Properties config = new Properties();
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 4dc17c0..57e2121 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -162,7 +162,7 @@ public class AbstractProcessorContextTest {
         }
 
         TestProcessorContext(final MockStreamsMetrics metrics) {
-            super(new TaskId(0, 0), "appId", new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache(new LogContext("name "), 0, metrics));
+            super(new TaskId(0, 0), new StreamsConfig(config), metrics, new StateManagerStub(), new ThreadCache(new LogContext("name "), 0, metrics));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index efc6f79..ee366fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -78,7 +78,7 @@ public class AbstractTaskTest {
     public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException {
         final Consumer consumer = EasyMock.createNiceMock(Consumer.class);
         final StateStore store = EasyMock.createNiceMock(StateStore.class);
-        EasyMock.expect(stateDirectory.lock(id, 5)).andReturn(false);
+        EasyMock.expect(stateDirectory.lock(id)).andReturn(false);
         EasyMock.replay(stateDirectory);
 
         final AbstractTask task = createTask(consumer, Collections.singletonList(store));
@@ -107,11 +107,10 @@ public class AbstractTaskTest {
 
     private AbstractTask createTask(final Consumer consumer, final List<StateStore> stateStores) {
         final Properties properties = new Properties();
-        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyhost:9092");
         final StreamsConfig config = new StreamsConfig(properties);
         return new AbstractTask(id,
-                                "app",
                                 Collections.singletonList(new TopicPartition("t", 0)),
                                 new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
                                                       Collections.<String, SourceNode>emptyMap(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index e9d61f5..0604a00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -21,16 +21,18 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpReadOnlyStore;
@@ -44,11 +46,14 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
@@ -71,7 +76,7 @@ public class GlobalStateManagerImplTest {
     private GlobalStateManagerImpl stateManager;
     private NoOpProcessorContext context;
     private StateDirectory stateDirectory;
-    private String stateDirPath;
+    private StreamsConfig config;
     private NoOpReadOnlyStore<Object, Object> store1;
     private NoOpReadOnlyStore store2;
     private MockConsumer<byte[], byte[]> consumer;
@@ -84,11 +89,8 @@ public class GlobalStateManagerImplTest {
         storeToTopic.put("t1-store", "t1");
         storeToTopic.put("t2-store", "t2");
 
-        final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>();
         store1 = new NoOpReadOnlyStore<>("t1-store");
-        storeToProcessorNode.put(store1, new MockProcessorNode(-1));
         store2 = new NoOpReadOnlyStore("t2-store");
-        storeToProcessorNode.put(store2, new MockProcessorNode(-1));
         topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
                                          Collections.<String, SourceNode>emptyMap(),
                                          Collections.<String, SinkNode>emptyMap(),
@@ -97,10 +99,22 @@ public class GlobalStateManagerImplTest {
                                          Arrays.<StateStore>asList(store1, store2));
 
         context = new NoOpProcessorContext();
-        stateDirPath = TestUtils.tempDirectory().getPath();
-        stateDirectory = new StateDirectory("appId", stateDirPath, time);
+        config = new StreamsConfig(new Properties() {
+            {
+                put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+            }
+        });
+        stateDirectory = new StateDirectory(config, time);
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory, stateRestoreListener);
+        stateManager = new GlobalStateManagerImpl(
+            new LogContext("mock"),
+            topology,
+            consumer,
+            stateDirectory,
+            stateRestoreListener,
+            config);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
     }
 
@@ -117,9 +131,9 @@ public class GlobalStateManagerImplTest {
 
     @Test(expected = LockException.class)
     public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
-        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, time);
+        final StateDirectory stateDir = new StateDirectory(config, time);
         try {
-            stateDir.lockGlobalState(1);
+            stateDir.lockGlobalState();
             stateManager.initialize(context);
         } finally {
             stateDir.unlockGlobalState();
@@ -328,10 +342,10 @@ public class GlobalStateManagerImplTest {
     public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
         stateManager.initialize(context);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
-        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, new MockTime());
+        final StateDirectory stateDir = new StateDirectory(config, new MockTime());
         try {
             // should be able to get the lock now as it should've been released in close
-            assertTrue(stateDir.lockGlobalState(1));
+            assertTrue(stateDir.lockGlobalState());
         } finally {
             stateDir.unlockGlobalState();
         }
@@ -389,10 +403,10 @@ public class GlobalStateManagerImplTest {
         } catch (StreamsException e) {
             // expected
         }
-        final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, new MockTime());
+        final StateDirectory stateDir = new StateDirectory(config, new MockTime());
         try {
             // should be able to get the lock now as it should've been released
-            assertTrue(stateDir.lockGlobalState(1));
+            assertTrue(stateDir.lockGlobalState());
         } finally {
             stateDir.unlockGlobalState();
         }
@@ -470,12 +484,12 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
-        stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId", stateDirPath, time) {
+        stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(config, time) {
             @Override
-            public boolean lockGlobalState(final int retry) throws IOException {
+            public boolean lockGlobalState() throws IOException {
                 throw new IOException("KABOOM!");
             }
-        }, stateRestoreListener);
+        }, stateRestoreListener, config);
 
         try {
             stateManager.initialize(context);
@@ -485,6 +499,72 @@ public class GlobalStateManagerImplTest {
         }
     }
 
+    @Test
+    public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
+        final int retries = 2;
+        final AtomicInteger numberOfCalls = new AtomicInteger(0);
+        consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public synchronized Map<TopicPartition, Long> endOffsets(Collection<org.apache.kafka.common.TopicPartition> partitions) {
+                numberOfCalls.incrementAndGet();
+                throw new TimeoutException();
+            }
+        };
+        config = new StreamsConfig(new Properties() {
+            {
+                put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+                put(StreamsConfig.RETRIES_CONFIG, retries);
+            }
+        });
+
+        try {
+            new GlobalStateManagerImpl(
+                new LogContext("mock"),
+                topology,
+                consumer,
+                stateDirectory,
+                stateRestoreListener,
+                config);
+        } catch (final StreamsException expected) {
+            assertEquals(numberOfCalls.get(), retries);
+        }
+    }
+
+    @Test
+    public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
+        final int retries = 2;
+        final AtomicInteger numberOfCalls = new AtomicInteger(0);
+        consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public synchronized List<PartitionInfo> partitionsFor(String topic) {
+                numberOfCalls.incrementAndGet();
+                throw new TimeoutException();
+            }
+        };
+        config = new StreamsConfig(new Properties() {
+            {
+                put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+                put(StreamsConfig.RETRIES_CONFIG, retries);
+            }
+        });
+
+        try {
+            new GlobalStateManagerImpl(
+                new LogContext("mock"),
+                topology,
+                consumer,
+                stateDirectory,
+                stateRestoreListener,
+                config);
+        } catch (final StreamsException expected) {
+            assertEquals(numberOfCalls.get(), retries);
+        }
+    }
+
     private void writeCorruptCheckpoint() throws IOException {
         final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 29f1ac0..418b0ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -60,11 +60,12 @@ public class GlobalStreamThreadTest {
         final HashMap<String, Object> properties = new HashMap<>();
         properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "blah");
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         config = new StreamsConfig(properties);
         globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
+                                                    new StateDirectory(config, time),
                                                     new Metrics(),
                                                     new MockTime(),
                                                     "clientId",
@@ -96,7 +97,7 @@ public class GlobalStreamThreadTest {
         globalStreamThread = new GlobalStreamThread(builder.buildGlobalStateTopology(),
                                                     config,
                                                     mockConsumer,
-                                                    new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
+                                                    new StateDirectory(config, time),
                                                     new Metrics(),
                                                     new MockTime(),
                                                     "clientId",

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index e223699..fa83a71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -578,6 +579,7 @@ public class InternalTopologyBuilderTest {
         final Properties config = new Properties();
         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
+        config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         final StreamsConfig streamsConfig = new StreamsConfig(config);
 
         builder.addSource(null, sourceNodeName, null, null, null, "topic");

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index dc009f5..1cb0cd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -40,6 +41,7 @@ import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -83,7 +85,14 @@ public class ProcessorStateManagerTest {
     @Before
     public void setup() {
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), new MockTime());
+
+        stateDirectory = new StateDirectory(new StreamsConfig(new Properties() {
+            {
+                put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                put(StreamsConfig.STATE_DIR_CONFIG, baseDir.getPath());
+            }
+        }), new MockTime());
         checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         checkpoint = new OffsetCheckpoint(checkpointFile);
     }
@@ -149,7 +158,7 @@ public class ProcessorStateManagerTest {
             },
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         try {
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
@@ -176,7 +185,7 @@ public class ProcessorStateManagerTest {
             },
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         try {
             stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
@@ -225,7 +234,7 @@ public class ProcessorStateManagerTest {
             storeToChangelogTopic,
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         try {
             stateMgr.register(store1, store1.stateRestoreCallback);
@@ -258,7 +267,7 @@ public class ProcessorStateManagerTest {
             Collections.<String, String>emptyMap(),
             changelogReader,
             false,
-                logContext);
+            logContext);
         try {
             stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
 
@@ -293,7 +302,7 @@ public class ProcessorStateManagerTest {
             },
             changelogReader,
             false,
-                logContext);
+            logContext);
         try {
             // make sure the checkpoint file isn't deleted
             assertTrue(checkpointFile.exists());
@@ -328,7 +337,7 @@ public class ProcessorStateManagerTest {
             Collections.<String, String>emptyMap(),
             changelogReader,
             false,
-                logContext);
+            logContext);
         stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
@@ -347,7 +356,7 @@ public class ProcessorStateManagerTest {
             Collections.<String, String>emptyMap(),
             changelogReader,
             false,
-                logContext);
+            logContext);
         stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         stateMgr.close(null);
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -364,7 +373,7 @@ public class ProcessorStateManagerTest {
             Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
             changelogReader,
             false,
-                logContext);
+            logContext);
         stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
@@ -382,7 +391,7 @@ public class ProcessorStateManagerTest {
             Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
@@ -413,7 +422,7 @@ public class ProcessorStateManagerTest {
             Collections.singletonMap(nonPersistentStoreName, nonPersistentStoreTopicName),
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
@@ -432,7 +441,7 @@ public class ProcessorStateManagerTest {
             Collections.<String, String>emptyMap(),
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
@@ -453,7 +462,7 @@ public class ProcessorStateManagerTest {
             Collections.<String, String>emptyMap(),
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         try {
             stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), null);
@@ -473,7 +482,7 @@ public class ProcessorStateManagerTest {
             Collections.<String, String>emptyMap(),
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         stateManager.register(mockStateStore, null);
 
@@ -526,7 +535,7 @@ public class ProcessorStateManagerTest {
             Collections.singletonMap(storeName, changelogTopic),
             changelogReader,
             false,
-                logContext);
+            logContext);
 
         final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
             @Override
@@ -629,7 +638,7 @@ public class ProcessorStateManagerTest {
                 Collections.<String, String>emptyMap(),
                 changelogReader,
                 true,
-                    logContext);
+                logContext);
 
             assertFalse(checkpointFile.exists());
         } finally {
@@ -652,7 +661,7 @@ public class ProcessorStateManagerTest {
             },
             changelogReader,
             false,
-                logContext);
+            logContext);
     }
 
     private MockStateStoreSupplier.MockStateStore getPersistentStore() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 86a1af1..396965a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -140,7 +140,7 @@ public class StandbyTaskTest {
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
     @Before
-    public void setup() {
+    public void setup() throws Exception {
         restoreStateConsumer.reset();
         restoreStateConsumer.updatePartitions(storeChangelogTopicName1, Utils.mkList(
                 new PartitionInfo(storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]),
@@ -154,7 +154,7 @@ public class StandbyTaskTest {
                 new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])
         ));
         baseDir = TestUtils.tempDirectory();
-        stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), new MockTime());
+        stateDirectory = new StateDirectory(createConfig(baseDir), new MockTime());
     }
 
     @After
@@ -165,7 +165,7 @@ public class StandbyTaskTest {
     @Test
     public void testStorePartitions() throws IOException {
         StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
         assertEquals(Utils.mkSet(partition2), new HashSet<>(task.checkpointedOffsets().keySet()));
 
@@ -175,7 +175,7 @@ public class StandbyTaskTest {
     @Test(expected = ProcessorStateException.class)
     public void testUpdateNonPersistentStore() throws IOException {
         StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
@@ -188,7 +188,7 @@ public class StandbyTaskTest {
     @Test
     public void testUpdate() throws IOException {
         StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
+        StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
@@ -245,7 +245,7 @@ public class StandbyTaskTest {
         ));
 
         StreamsConfig config = createConfig(baseDir);
-        StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory);
+        StandbyTask task = new StandbyTask(taskId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
@@ -341,7 +341,7 @@ public class StandbyTaskTest {
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
         final ProcessorTopology topology = internalTopologyBuilder.setApplicationId(applicationId).build(0);
 
-        new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config,
+        new StandbyTask(taskId, partitions, topology, consumer, changelogReader, config,
             new MockStreamsMetrics(new Metrics()), stateDirectory);
     }
 
@@ -359,7 +359,6 @@ public class StandbyTaskTest {
         final MockTime time = new MockTime();
         final StreamsConfig config = createConfig(baseDir);
         final StandbyTask task = new StandbyTask(taskId,
-                                                 applicationId,
                                                  ktablePartitions,
                                                  ktableTopology,
                                                  consumer,
@@ -402,7 +401,6 @@ public class StandbyTaskTest {
         final StreamsConfig config = createConfig(baseDir);
         final AtomicBoolean closedStateManager = new AtomicBoolean(false);
         final StandbyTask task = new StandbyTask(taskId,
-                                                 applicationId,
                                                  ktablePartitions,
                                                  ktableTopology,
                                                  consumer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/40fd9fa9/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 886188d..1a5d46d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.TestUtils;
@@ -32,6 +33,7 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -53,7 +55,15 @@ public class StateDirectoryTest {
     @Before
     public void before() {
         stateDir = new File(TestUtils.IO_TMP_DIR, TestUtils.randomString(5));
-        directory = new StateDirectory(applicationId, stateDir.getPath(), time);
+        directory = new StateDirectory(
+            new StreamsConfig(new Properties() {
+                {
+                    put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+                    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                    put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
+                }
+            }),
+            time);
         appDir = new File(stateDir, applicationId);
     }
 
@@ -83,7 +93,7 @@ public class StateDirectoryTest {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
 
-        directory.lock(taskId, 0);
+        directory.lock(taskId);
 
         try (
             final FileChannel channel = FileChannel.open(
@@ -103,9 +113,9 @@ public class StateDirectoryTest {
     public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         directory.directoryForTask(taskId);
-        directory.lock(taskId, 0);
+        directory.lock(taskId);
         try {
-            assertTrue(directory.lock(taskId, 0));
+            assertTrue(directory.lock(taskId));
         } finally {
             directory.unlock(taskId);
         }
@@ -124,7 +134,7 @@ public class StateDirectoryTest {
         final TaskId taskId = new TaskId(0, 0);
 
         Utils.delete(stateDir);
-        assertFalse(directory.lock(taskId, 0));
+        assertFalse(directory.lock(taskId));
     }
     
     @Test
@@ -144,8 +154,8 @@ public class StateDirectoryTest {
                 StandardOpenOption.CREATE,
                 StandardOpenOption.WRITE)
         ) {
-            directory.lock(taskId, 0);
-            directory.lock(taskId2, 0);
+            directory.lock(taskId);
+            directory.lock(taskId2);
 
             channel1.tryLock();
             channel2.tryLock();
@@ -163,7 +173,7 @@ public class StateDirectoryTest {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
 
-        directory.lock(taskId, 1);
+        directory.lock(taskId);
         directory.unlock(taskId);
 
         try (
@@ -181,8 +191,8 @@ public class StateDirectoryTest {
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
         try {
-            directory.lock(task0, 0);
-            directory.lock(task1, 0);
+            directory.lock(task0);
+            directory.lock(task1);
             directory.directoryForTask(new TaskId(2, 0));
 
             time.sleep(1000);
@@ -232,7 +242,15 @@ public class StateDirectoryTest {
     public void shouldCreateDirectoriesIfParentDoesntExist() {
         final File tempDir = TestUtils.tempDirectory();
         final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
-        final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath(), time);
+        final StateDirectory stateDirectory = new StateDirectory(
+            new StreamsConfig(new Properties() {
+                {
+                    put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+                    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                    put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
+                }
+            }),
+            time);
         final File taskDir = stateDirectory.directoryForTask(new TaskId(0, 0));
         assertTrue(stateDir.exists());
         assertTrue(taskDir.exists());
@@ -240,7 +258,7 @@ public class StateDirectoryTest {
 
     @Test(expected = OverlappingFileLockException.class)
     public void shouldLockGlobalStateDirectory() throws IOException {
-        directory.lockGlobalState(1);
+        directory.lockGlobalState();
 
         try (
             final FileChannel channel = FileChannel.open(
@@ -256,7 +274,7 @@ public class StateDirectoryTest {
 
     @Test
     public void shouldUnlockGlobalStateDirectory() throws IOException {
-        directory.lockGlobalState(1);
+        directory.lockGlobalState();
         directory.unlockGlobalState();
 
         try (
@@ -278,7 +296,7 @@ public class StateDirectoryTest {
             @Override
             public void run() {
                 try {
-                    directory.lock(taskId, 1);
+                    directory.lock(taskId);
                 } catch (final IOException e) {
                     exceptionOnThread.set(e);
                 }
@@ -287,7 +305,7 @@ public class StateDirectoryTest {
         thread.start();
         thread.join(30000);
         assertNull("should not have had an exception during locking on other thread", exceptionOnThread.get());
-        assertFalse(directory.lock(taskId, 1));
+        assertFalse(directory.lock(taskId));
     }
 
     @Test
@@ -300,7 +318,7 @@ public class StateDirectoryTest {
             @Override
             public void run() {
                 try {
-                    directory.lock(taskId, 1);
+                    directory.lock(taskId);
                     lockLatch.countDown();
                     unlockLatch.await();
                     directory.unlock(taskId);
@@ -314,13 +332,13 @@ public class StateDirectoryTest {
 
         assertNull("should not have had an exception on other thread", exceptionOnThread.get());
         directory.unlock(taskId);
-        assertFalse(directory.lock(taskId, 1));
+        assertFalse(directory.lock(taskId));
 
         unlockLatch.countDown();
         thread.join(30000);
 
         assertNull("should not have had an exception on other thread", exceptionOnThread.get());
-        assertTrue(directory.lock(taskId, 1));
+        assertTrue(directory.lock(taskId));
     }
 
 }
\ No newline at end of file