You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/17 00:23:15 UTC

[2/3] kafka git commit: KAFKA-4923: Add Exactly-Once Semantics to Streams

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index cfd702e..2dd4553 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -32,18 +32,24 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
+import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 public class StreamsConfigTest {
 
-    private Properties props = new Properties();
+    private final Properties props = new Properties();
     private StreamsConfig streamsConfig;
 
     @Before
@@ -58,40 +64,56 @@ public class StreamsConfigTest {
         streamsConfig = new StreamsConfig(props);
     }
 
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfApplicationIdIsNotSet() {
+        props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
+        new StreamsConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfBootstrapServersIsNotSet() {
+        props.remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        new StreamsConfig(props);
+    }
+
     @Test
     public void testGetProducerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
-        assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
+        assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
         assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
     public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
-        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
+        final String groupId = "example-application";
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId);
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
+        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
         assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
         assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
     public void testGetRestoreConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
-        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
+        final String clientId = "client";
+        final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
+        assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
         assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
     public void defaultSerdeShouldBeConfigured() {
-        Map<String, Object> serializerConfigs = new HashMap<>();
+        final Map<String, Object> serializerConfigs = new HashMap<>();
         serializerConfigs.put("key.serializer.encoding", "UTF8");
         serializerConfigs.put("value.serializer.encoding", "UTF-16");
-        Serializer<String> serializer = Serdes.String().serializer();
+        final Serializer<String> serializer = Serdes.String().serializer();
 
-        String str = "my string for testing";
-        String topic = "my topic";
+        final String str = "my string for testing";
+        final String topic = "my topic";
 
         serializer.configure(serializerConfigs, true);
         assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
@@ -104,14 +126,14 @@ public class StreamsConfigTest {
 
     @Test
     public void shouldSupportMultipleBootstrapServers() {
-        List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092");
-        String bootstrapServersString = Utils.join(expectedBootstrapServers, ",");
-        Properties props = new Properties();
+        final List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092");
+        final String bootstrapServersString = Utils.join(expectedBootstrapServers, ",");
+        final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString);
-        StreamsConfig config = new StreamsConfig(props);
+        final StreamsConfig config = new StreamsConfig(props);
 
-        List<String> actualBootstrapServers = config.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+        final List<String> actualBootstrapServers = config.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
         assertEquals(expectedBootstrapServers, actualBootstrapServers);
     }
 
@@ -165,7 +187,7 @@ public class StreamsConfigTest {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+        final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
         assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -195,7 +217,7 @@ public class StreamsConfigTest {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+        final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
         assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
@@ -230,7 +252,7 @@ public class StreamsConfigTest {
     public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
         props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
         assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
     }
 
@@ -239,7 +261,7 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
         assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
@@ -261,10 +283,113 @@ public class StreamsConfigTest {
     @Test
     public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
     }
 
+    @Test
+    public void shouldAcceptAtLeastOnce() {
+        // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once");
+        new StreamsConfig(props);
+    }
+
+    @Test
+    public void shouldAcceptExactlyOnce() {
+        // don't use `StreamsConfig.EXACLTY_ONCE` to actually do a useful test
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+        new StreamsConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "bad_value");
+        new StreamsConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+    }
+
+    @Test
+    public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
+        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+    }
+
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getProducerConfigs("clientId");
+    }
+
+    @Test
+    public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getProducerConfigs("clientId");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void shouldThrowExceptionIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getProducerConfigs("clientId");
+    }
+
+    @Test
+    public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        streamsConfig.getProducerConfigs("clientId");
+    }
+
+    @Test
+    public void shouldSetDifferentDefaultsIfEosEnabled() {
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+
+        assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
+        assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
+        assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
+        assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(1));
+        assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
+    }
+
+    @Test
+    public void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
+        final int numberOfRetries = 42;
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+
+        assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
+    }
+
+    @Test
+    public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
+        final long commitIntervalMs = 73L;
+        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+        assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
+    }
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 5705ea6..ba3230a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -35,6 +36,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.Properties;
 
 public class AbstractTaskTest {
 
@@ -61,6 +63,10 @@ public class AbstractTaskTest {
 
     private AbstractTask createTask(final Consumer consumer) {
         final MockTime time = new MockTime();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyhost:9092");
+        final StreamsConfig config = new StreamsConfig(properties);
         return new AbstractTask(new TaskId(0, 0),
                                 "app",
                                 Collections.singletonList(new TopicPartition("t", 0)),
@@ -74,7 +80,8 @@ public class AbstractTaskTest {
                                 new StoreChangelogReader(consumer, Time.SYSTEM, 5000),
                                 false,
                                 new StateDirectory("app", TestUtils.tempDirectory().getPath(), time),
-                                new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) {
+                                new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())),
+                                config) {
             @Override
             public void resume() {}
 
@@ -85,7 +92,7 @@ public class AbstractTaskTest {
             public void suspend() {}
 
             @Override
-            public void close() {}
+            public void close(final boolean clean) {}
         };
     }
 
@@ -98,4 +105,4 @@ public class AbstractTaskTest {
         };
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 9f54b4a..476b009 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -33,6 +31,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 public class PartitionGroupTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
@@ -58,7 +58,7 @@ public class PartitionGroupTest {
         assertEquals(0, group.numBuffered());
 
         // add three 3 records with timestamp 1, 3, 5 to partition-1
-        List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
             new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
@@ -66,7 +66,7 @@ public class PartitionGroupTest {
         group.addRawRecords(partition1, list1);
 
         // add three 3 records with timestamp 2, 4, 6 to partition-2
-        List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+        final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
             new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
             new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
             new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
@@ -79,7 +79,7 @@ public class PartitionGroupTest {
         assertEquals(1L, group.timestamp());
 
         StampedRecord record;
-        PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
+        final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
@@ -100,7 +100,7 @@ public class PartitionGroupTest {
         assertEquals(3L, group.timestamp());
 
         // add three 3 records with timestamp 2, 4, 6 to partition-1 again
-        List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
+        final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
             new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
             new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index cfc1022..f454216 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -90,15 +91,23 @@ public class ProcessorStateManagerTest {
     public void testRegisterPersistentStore() throws IOException {
         final TaskId taskId = new TaskId(0, 2);
 
-        MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
-        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, new HashMap<String, String>() {
-            {
-                put(persistentStoreName, persistentStoreTopicName);
-                put(nonPersistentStoreName, nonPersistentStoreName);
-            }
-        }, changelogReader);
-        try {
+        final MockStateStoreSupplier.MockStateStore persistentStore
+            = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            new HashMap<String, String>() {
+                {
+                    put(persistentStoreName, persistentStoreTopicName);
+                    put(nonPersistentStoreName, nonPersistentStoreName);
+                }
+            },
+            changelogReader,
+            false);
 
+        try {
             stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
         } finally {
@@ -108,18 +117,25 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void testRegisterNonPersistentStore() throws IOException {
-        MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
-        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, false, stateDirectory, new HashMap<String, String>() {
-            {
-                put(persistentStoreName, persistentStoreTopicName);
-                put(nonPersistentStoreName, nonPersistentStoreTopicName);
-            }
-        }, changelogReader);
-        try {
+        final MockStateStoreSupplier.MockStateStore nonPersistentStore
+            = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            new TaskId(0, 2),
+            noPartitions,
+            false,
+            stateDirectory,
+            new HashMap<String, String>() {
+                {
+                    put(persistentStoreName, persistentStoreTopicName);
+                    put(nonPersistentStoreName, nonPersistentStoreTopicName);
+                }
+            },
+            changelogReader,
+            false);
 
+        try {
             stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
-
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
         }
@@ -128,41 +144,49 @@ public class ProcessorStateManagerTest {
     @Test
     public void testChangeLogOffsets() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
-        long lastCheckpointedOffset = 10L;
-        String storeName1 = "store1";
-        String storeName2 = "store2";
-        String storeName3 = "store3";
+        final long lastCheckpointedOffset = 10L;
+        final String storeName1 = "store1";
+        final String storeName2 = "store2";
+        final String storeName3 = "store3";
 
-        String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
-        String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
-        String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
+        final String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+        final String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+        final String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
 
-        Map<String, String> storeToChangelogTopic = new HashMap<>();
+        final Map<String, String> storeToChangelogTopic = new HashMap<>();
         storeToChangelogTopic.put(storeName1, storeTopicName1);
         storeToChangelogTopic.put(storeName2, storeTopicName2);
         storeToChangelogTopic.put(storeName3, storeTopicName3);
 
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
         checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
 
-        TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
-        TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
-        TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
+        final TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
+        final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
+        final TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
 
-        MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
-        MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
-        MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
+        final MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
+        final MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
+        final MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
 
         // if there is a source partition, inherit the partition id
-        Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+        final Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            sourcePartitions,
+            true, // standby
+            stateDirectory,
+            storeToChangelogTopic,
+            changelogReader,
+            false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions, true, stateDirectory, storeToChangelogTopic, changelogReader); // standby
         try {
             stateMgr.register(store1, true, store1.stateRestoreCallback);
             stateMgr.register(store2, true, store2.stateRestoreCallback);
             stateMgr.register(store3, true, store3.stateRestoreCallback);
 
-            Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
+            final Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
 
             assertEquals(3, changeLogOffsets.size());
             assertTrue(changeLogOffsets.containsKey(partition1));
@@ -180,7 +204,14 @@ public class ProcessorStateManagerTest {
     @Test
     public void testGetStore() throws IOException {
         final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            new TaskId(0, 1),
+            noPartitions,
+            false,
+            stateDirectory,
+            Collections.<String, String>emptyMap(),
+            changelogReader,
+            false);
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -202,12 +233,19 @@ public class ProcessorStateManagerTest {
         ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
         ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, new HashMap<String, String>() {
-            {
-                put(persistentStoreName, persistentStoreTopicName);
-                put(nonPersistentStoreName, nonPersistentStoreTopicName);
-            }
-        }, changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            new HashMap<String, String>() {
+                {
+                    put(persistentStoreName, persistentStoreTopicName);
+                    put(nonPersistentStoreName, nonPersistentStoreTopicName);
+                }
+            },
+            changelogReader,
+            false);
         try {
             // make sure the checkpoint file isn't deleted
             assertTrue(checkpointFile.exists());
@@ -234,7 +272,14 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            new TaskId(0, 1),
+            noPartitions,
+            false,
+            stateDirectory,
+            Collections.<String, String>emptyMap(),
+            changelogReader,
+            false);
         stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
@@ -245,7 +290,14 @@ public class ProcessorStateManagerTest {
         checkpoint.write(offsets);
 
         final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            Collections.<String, String>emptyMap(),
+            changelogReader,
+            false);
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
         stateMgr.close(null);
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -254,13 +306,14 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception {
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
-                                                                         noPartitions,
-                                                                         false,
-                                                                         stateDirectory,
-                                                                         Collections.singletonMap(persistentStore.name(),
-                                                                                                  persistentStoreTopicName),
-                                                                         changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
+            changelogReader,
+            false);
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
 
@@ -271,13 +324,14 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldWriteCheckpointForStandbyReplica() throws Exception {
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
-                                                                         noPartitions,
-                                                                         true,
-                                                                         stateDirectory,
-                                                                         Collections.singletonMap(persistentStore.name(),
-                                                                                                  persistentStoreTopicName),
-                                                                         changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            true, // standby
+            stateDirectory,
+            Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
+            changelogReader,
+            false);
 
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
@@ -300,14 +354,14 @@ public class ProcessorStateManagerTest {
     public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
         final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1);
 
-
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
-                                                                         noPartitions,
-                                                                         true,
-                                                                         stateDirectory,
-                                                                         Collections.singletonMap(nonPersistentStoreName,
-                                                                                                  nonPersistentStoreTopicName),
-                                                                         changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            true, // standby
+            stateDirectory,
+            Collections.singletonMap(nonPersistentStoreName, nonPersistentStoreTopicName),
+            changelogReader,
+            false);
 
         stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
@@ -318,12 +372,14 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception {
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
-                                                                         noPartitions,
-                                                                         true,
-                                                                         stateDirectory,
-                                                                         Collections.<String, String>emptyMap(),
-                                                                         changelogReader);
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            true, // standby
+            stateDirectory,
+            Collections.<String, String>emptyMap(),
+            changelogReader,
+            false);
 
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
 
@@ -344,7 +400,14 @@ public class ProcessorStateManagerTest {
         final FileLock lock = channel.lock();
 
         try {
-            new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+            new ProcessorStateManager(
+                taskId,
+                noPartitions,
+                false,
+                stateDirectory,
+                Collections.<String, String>emptyMap(),
+                changelogReader,
+                false);
             fail("Should have thrown LockException");
         } catch (final LockException e) {
            // pass
@@ -356,11 +419,14 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception {
-        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
-                                                                             noPartitions,
-                                                                             false,
-                                                                             stateDirectory,
-                                                                             Collections.<String, String>emptyMap(), changelogReader);
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            Collections.<String, String>emptyMap(),
+            changelogReader,
+            false);
 
         try {
             stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null);
@@ -372,11 +438,14 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception {
-        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
-                                                                             noPartitions,
-                                                                             false,
-                                                                             stateDirectory,
-                                                                             Collections.<String, String>emptyMap(), changelogReader);
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+            taskId,
+            noPartitions,
+            false,
+            stateDirectory,
+            Collections.<String, String>emptyMap(),
+            changelogReader,
+            false);
         stateManager.register(mockStateStore, false, null);
 
         try {
@@ -391,12 +460,14 @@ public class ProcessorStateManagerTest {
     @Test
     public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception {
 
-        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
-                                                                             Collections.singleton(changelogTopicPartition),
-                                                                             false,
-                                                                             stateDirectory,
-                                                                             Collections.singletonMap(storeName, changelogTopic),
-                                                                             changelogReader);
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+            taskId,
+            Collections.singleton(changelogTopicPartition),
+            false,
+            stateDirectory,
+            Collections.singletonMap(storeName, changelogTopic),
+            changelogReader,
+            false);
 
         final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
             @Override
@@ -414,5 +485,28 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    @Test
+    public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws Exception {
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        assertTrue(checkpointFile.exists());
+
+        ProcessorStateManager stateManager = null;
+        try {
+            stateManager = new ProcessorStateManager(
+                taskId,
+                noPartitions,
+                false,
+                stateDirectory,
+                Collections.<String, String>emptyMap(),
+                changelogReader,
+                true);
+
+            assertFalse(checkpointFile.exists());
+        } finally {
+            if (stateManager != null) {
+                stateManager.close(null);
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3add508..b8c86f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -43,7 +43,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -112,14 +111,14 @@ public class StreamTaskTest {
     private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
     private final TaskId taskId00 = new TaskId(0, 0);
     private final MockTime time = new MockTime();
-    private File baseDir;
+    private File baseDir = TestUtils.tempDirectory();
     private StateDirectory stateDirectory;
     private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId");
-    private final ThreadCache testCache =  new ThreadCache("testCache", 0, streamsMetrics);
     private StreamsConfig config;
+    private StreamsConfig eosConfig;
     private StreamTask task;
 
-    private StreamsConfig createConfig(final File baseDir) throws Exception {
+    private StreamsConfig createConfig(final boolean enableEoS) throws Exception {
         return new StreamsConfig(new Properties() {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
@@ -127,6 +126,9 @@ public class StreamTaskTest {
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
                 setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                if (enableEoS) {
+                    setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+                }
             }
         });
     }
@@ -139,23 +141,22 @@ public class StreamTaskTest {
         consumer.assign(Arrays.asList(partition1, partition2));
         source1.addChild(processor);
         source2.addChild(processor);
-        baseDir = TestUtils.tempDirectory();
-        config = createConfig(baseDir);
+        config = createConfig(false);
+        eosConfig = createConfig(true);
         stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
-                              changelogReader, config, streamsMetrics, stateDirectory, null, time, recordCollector);
+                              changelogReader, config, streamsMetrics, stateDirectory, null, time, producer);
     }
 
     @After
     public void cleanup() throws IOException {
-        if (task != null) {
-            try {
-                task.close();
-            } catch (final Exception e) {
-                // ignore exceptions
+        try {
+            if (task != null) {
+                task.close(true);
             }
+        } finally {
+            Utils.delete(baseDir);
         }
-        Utils.delete(baseDir);
     }
 
     @SuppressWarnings("unchecked")
@@ -354,8 +355,12 @@ public class StreamTaskTest {
         };
 
         final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode);
-        final Map<String, SourceNode> sourceNodes
-                = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+        final Map<String, SourceNode> sourceNodes = new HashMap() {
+            {
+                put(topic1[0], processorNode);
+                put(topic2[0], processorNode);
+            }
+        };
         final ProcessorTopology topology = new ProcessorTopology(processorNodes,
                                                                  sourceNodes,
                                                                  Collections.<String, SinkNode>emptyMap(),
@@ -363,10 +368,10 @@ public class StreamTaskTest {
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        task.close();
+        task.close(true);
 
-        task  = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
-                topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector);
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
+            streamsMetrics, stateDirectory, null, time, producer);
         final int offset = 20;
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -415,16 +420,20 @@ public class StreamTaskTest {
     @Test
     public void shouldFlushRecordCollectorOnFlushState() throws Exception {
         final AtomicBoolean flushed = new AtomicBoolean(false);
-        final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
+        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+            changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
+
             @Override
-            public void flush() {
-                flushed.set(true);
+            RecordCollector createRecordCollector() {
+                return new NoOpRecordCollector() {
+                    @Override
+                    public void flush() {
+                        flushed.set(true);
+                    }
+                };
             }
         };
-        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
-                                                     changelogReader, createConfig(baseDir), streamsMetrics,
-                                                     stateDirectory, testCache, time, recordCollector);
         streamTask.flushState();
         assertTrue(flushed.get());
     }
@@ -458,13 +467,6 @@ public class StreamTaskTest {
                                                                  Collections.<StateStore>emptyList());
 
         final TopicPartition partition = new TopicPartition(changelogTopic, 0);
-        final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
-            @Override
-            public Map<TopicPartition, Long> offsets() {
-
-                return Collections.singletonMap(partition, 543L);
-            }
-        };
 
         restoreStateConsumer.updatePartitions(changelogTopic,
                                               Collections.singletonList(
@@ -472,22 +474,29 @@ public class StreamTaskTest {
         restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
         restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
 
-        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final TaskId taskId = new TaskId(0, 0);
-        final MockTime time = new MockTime();
-        final StreamsConfig config = createConfig(baseDir);
-        final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer,
-                                                     changelogReader, config, streamsMetrics,
-                                                     stateDirectory, new ThreadCache("testCache", 0, streamsMetrics),
-                                                     time, recordCollector);
+        final long offset = 543L;
+        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+            changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
+
+            @Override
+            RecordCollector createRecordCollector() {
+                return new NoOpRecordCollector() {
+                    @Override
+                    public Map<TopicPartition, Long> offsets() {
+
+                        return Collections.singletonMap(partition, offset);
+                    }
+                };
+            }
+        };
 
         time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
 
         streamTask.commit();
-        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00),
                                                                           ProcessorStateManager.CHECKPOINT_FILE_NAME));
 
-        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L)));
+        assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, offset + 1)));
     }
 
     @Test
@@ -528,45 +537,161 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseTopology() throws Exception {
-        task.close();
+    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
+        task.close(true);
         task = createTaskThatThrowsExceptionOnClose();
         try {
-            task.close();
+            task.close(true);
             fail("should have thrown runtime exception");
         } catch (final RuntimeException e) {
-            // ok
-        }
-    }
-
-    @Test
-    public void shouldCloseAllProcessorNodesWhenExceptionsRaised() throws Exception {
-        task.close();
-        task = createTaskThatThrowsExceptionOnClose();
-        try {
-            task.close();
-        } catch (final RuntimeException e) {
-            // expected
+            task = null;
         }
         assertTrue(processor.closed);
         assertTrue(source1.closed);
         assertTrue(source2.closed);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldCloseProducerWhenExactlyOneEnabled() {
-        final Map properties = config.values();
-        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
-        final StreamsConfig config = new StreamsConfig(properties);
+    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
 
+        assertTrue(producer.transactionInitialized());
+        assertTrue(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
         final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            config, streamsMetrics, stateDirectory, null, time, producer);
 
-        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
-            changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId"));
+        assertFalse(producer.transactionInitialized());
+        assertFalse(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.addRecords(partition1, Collections.singletonList(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.process();
 
-        task.close();
+        task.suspend();
+        assertTrue(producer.sentOffsets());
+        assertTrue(producer.transactionCommitted());
+        assertFalse(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            config, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.addRecords(partition1, Collections.singletonList(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.process();
+
+        task.suspend();
+        assertFalse(producer.sentOffsets());
+        assertFalse(producer.transactionCommitted());
+        assertFalse(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.addRecords(partition1, Collections.singletonList(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.process();
+        task.suspend();
 
+        task.resume();
+        assertTrue(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            config, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.addRecords(partition1, Collections.singletonList(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.process();
+        task.suspend();
+
+        task.resume();
+        assertFalse(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.addRecords(partition1, Collections.singletonList(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.process();
+
+        task.commit();
+        assertTrue(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            config, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.addRecords(partition1, Collections.singletonList(
+            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+        task.process();
+
+        task.commit();
+        assertFalse(producer.transactionInFlight());
+    }
+
+    @Test
+    public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.close(false);
+        task = null;
+        assertTrue(producer.transactionAborted());
+    }
+
+    @Test
+    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            config, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.close(false);
+        assertFalse(producer.transactionAborted());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception {
+        final MockProducer producer = new MockProducer();
+
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+            changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        task.close(true);
+        task = null;
         assertTrue(producer.closed());
     }
 
@@ -579,8 +704,12 @@ public class StreamTaskTest {
             }
         };
         final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor, source1, source2);
-        final Map<String, SourceNode> sourceNodes
-                = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+        final Map<String, SourceNode> sourceNodes = new HashMap() {
+            {
+                put(topic1[0], processorNode);
+                put(topic2[0], processorNode);
+            }
+        };
         final ProcessorTopology topology = new ProcessorTopology(processorNodes,
                                                                  sourceNodes,
                                                                  Collections.<String, SinkNode>emptyMap(),
@@ -588,34 +717,12 @@ public class StreamTaskTest {
                                                                  Collections.<String, String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        return new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
-                topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector);
+        return new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
+            streamsMetrics, stateDirectory, null, time, producer);
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }
 
-    private final static class MockedProducer extends MockProducer {
-        private final AtomicBoolean flushed;
-        boolean closed = false;
-
-        MockedProducer(final AtomicBoolean flushed) {
-            super(false, null, null);
-            this.flushed = flushed;
-        }
-
-        @Override
-        public void flush() {
-            if (flushed != null) {
-                flushed.set(true);
-            }
-        }
-
-        @Override
-        public void close() {
-            closed = true;
-        }
-    }
-
 }