You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/17 00:23:15 UTC
[2/3] kafka git commit: KAFKA-4923: Add Exactly-Once Semantics to
Streams
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index cfd702e..2dd4553 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -32,18 +32,24 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
+import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
public class StreamsConfigTest {
- private Properties props = new Properties();
+ private final Properties props = new Properties();
private StreamsConfig streamsConfig;
@Before
@@ -58,40 +64,56 @@ public class StreamsConfigTest {
streamsConfig = new StreamsConfig(props);
}
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfApplicationIdIsNotSet() {
+ props.remove(StreamsConfig.APPLICATION_ID_CONFIG);
+ new StreamsConfig(props);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfBootstrapServersIsNotSet() {
+ props.remove(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+ new StreamsConfig(props);
+ }
+
@Test
public void testGetProducerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
- assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
+ final String clientId = "client";
+ final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
+ assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
assertNull(returnedProps.get("DUMMY"));
}
@Test
public void testGetConsumerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
- assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
- assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
+ final String groupId = "example-application";
+ final String clientId = "client";
+ final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId);
+ assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
+ assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
assertNull(returnedProps.get("DUMMY"));
}
@Test
public void testGetRestoreConsumerConfigs() throws Exception {
- Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
- assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
+ final String clientId = "client";
+ final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
+ assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
assertNull(returnedProps.get("DUMMY"));
}
@Test
public void defaultSerdeShouldBeConfigured() {
- Map<String, Object> serializerConfigs = new HashMap<>();
+ final Map<String, Object> serializerConfigs = new HashMap<>();
serializerConfigs.put("key.serializer.encoding", "UTF8");
serializerConfigs.put("value.serializer.encoding", "UTF-16");
- Serializer<String> serializer = Serdes.String().serializer();
+ final Serializer<String> serializer = Serdes.String().serializer();
- String str = "my string for testing";
- String topic = "my topic";
+ final String str = "my string for testing";
+ final String topic = "my topic";
serializer.configure(serializerConfigs, true);
assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
@@ -104,14 +126,14 @@ public class StreamsConfigTest {
@Test
public void shouldSupportMultipleBootstrapServers() {
- List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092");
- String bootstrapServersString = Utils.join(expectedBootstrapServers, ",");
- Properties props = new Properties();
+ final List<String> expectedBootstrapServers = Arrays.asList("broker1:9092", "broker2:9092");
+ final String bootstrapServersString = Utils.join(expectedBootstrapServers, ",");
+ final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "irrelevant");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString);
- StreamsConfig config = new StreamsConfig(props);
+ final StreamsConfig config = new StreamsConfig(props);
- List<String> actualBootstrapServers = config.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+ final List<String> actualBootstrapServers = config.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
assertEquals(expectedBootstrapServers, actualBootstrapServers);
}
@@ -165,7 +187,7 @@ public class StreamsConfigTest {
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+ final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@@ -195,7 +217,7 @@ public class StreamsConfigTest {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> configs = streamsConfig.getProducerConfigs("client");
+ final Map<String, Object> configs = streamsConfig.getProducerConfigs("clientId");
assertEquals(10, configs.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
}
@@ -230,7 +252,7 @@ public class StreamsConfigTest {
public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG));
}
@@ -239,7 +261,7 @@ public class StreamsConfigTest {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+ final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
assertEquals("latest", consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
}
@@ -261,10 +283,113 @@ public class StreamsConfigTest {
@Test
public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "group", "client");
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
}
+ @Test
+ public void shouldAcceptAtLeastOnce() {
+ // don't use `StreamsConfig.AT_LEAST_ONCE` to actually do a useful test
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "at_least_once");
+ new StreamsConfig(props);
+ }
+
+ @Test
+ public void shouldAcceptExactlyOnce() {
+ // don't use `StreamsConfig.EXACLTY_ONCE` to actually do a useful test
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+ new StreamsConfig(props);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "bad_value");
+ new StreamsConfig(props);
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ }
+
+ @Test
+ public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ }
+
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getProducerConfigs("clientId");
+ }
+
+ @Test
+ public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getProducerConfigs("clientId");
+ }
+
+ @Test(expected = ConfigException.class)
+ public void shouldThrowExceptionIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getProducerConfigs("clientId");
+ }
+
+ @Test
+ public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+ streamsConfig.getProducerConfigs("clientId");
+ }
+
+ @Test
+ public void shouldSetDifferentDefaultsIfEosEnabled() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+ final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+
+ assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
+ assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
+ assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(Integer.MAX_VALUE));
+ assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), equalTo(1));
+ assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
+ }
+
+ @Test
+ public void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
+ final int numberOfRetries = 42;
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+ final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+
+ assertThat((Integer) producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
+ }
+
+ @Test
+ public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
+ final long commitIntervalMs = 73L;
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
+ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);
+ final StreamsConfig streamsConfig = new StreamsConfig(props);
+
+ assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
+ }
+
static class MisconfiguredSerde implements Serde {
@Override
public void configure(final Map configs, final boolean isKey) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 5705ea6..ba3230a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
@@ -35,6 +36,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import java.util.Collections;
+import java.util.Properties;
public class AbstractTaskTest {
@@ -61,6 +63,10 @@ public class AbstractTaskTest {
private AbstractTask createTask(final Consumer consumer) {
final MockTime time = new MockTime();
+ final Properties properties = new Properties();
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyhost:9092");
+ final StreamsConfig config = new StreamsConfig(properties);
return new AbstractTask(new TaskId(0, 0),
"app",
Collections.singletonList(new TopicPartition("t", 0)),
@@ -74,7 +80,8 @@ public class AbstractTaskTest {
new StoreChangelogReader(consumer, Time.SYSTEM, 5000),
false,
new StateDirectory("app", TestUtils.tempDirectory().getPath(), time),
- new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) {
+ new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())),
+ config) {
@Override
public void resume() {}
@@ -85,7 +92,7 @@ public class AbstractTaskTest {
public void suspend() {}
@Override
- public void close() {}
+ public void close(final boolean clean) {}
};
}
@@ -98,4 +105,4 @@ public class AbstractTaskTest {
};
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 9f54b4a..476b009 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import static org.junit.Assert.assertEquals;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
@@ -33,6 +31,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import static org.junit.Assert.assertEquals;
+
public class PartitionGroupTest {
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
@@ -58,7 +58,7 @@ public class PartitionGroupTest {
assertEquals(0, group.numBuffered());
// add three 3 records with timestamp 1, 3, 5 to partition-1
- List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+ final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
@@ -66,7 +66,7 @@ public class PartitionGroupTest {
group.addRawRecords(partition1, list1);
// add three 3 records with timestamp 2, 4, 6 to partition-2
- List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+ final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
@@ -79,7 +79,7 @@ public class PartitionGroupTest {
assertEquals(1L, group.timestamp());
StampedRecord record;
- PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
+ final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
// get one record, now the time should be advanced
record = group.nextRecord(info);
@@ -100,7 +100,7 @@ public class PartitionGroupTest {
assertEquals(3L, group.timestamp());
// add three 3 records with timestamp 2, 4, 6 to partition-1 again
- List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
+ final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index cfc1022..f454216 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -45,6 +45,7 @@ import java.util.Set;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -90,15 +91,23 @@ public class ProcessorStateManagerTest {
public void testRegisterPersistentStore() throws IOException {
final TaskId taskId = new TaskId(0, 2);
- MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
- ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, new HashMap<String, String>() {
- {
- put(persistentStoreName, persistentStoreTopicName);
- put(nonPersistentStoreName, nonPersistentStoreName);
- }
- }, changelogReader);
- try {
+ final MockStateStoreSupplier.MockStateStore persistentStore
+ = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ new HashMap<String, String>() {
+ {
+ put(persistentStoreName, persistentStoreTopicName);
+ put(nonPersistentStoreName, nonPersistentStoreName);
+ }
+ },
+ changelogReader,
+ false);
+ try {
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
} finally {
@@ -108,18 +117,25 @@ public class ProcessorStateManagerTest {
@Test
public void testRegisterNonPersistentStore() throws IOException {
- MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
- ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, false, stateDirectory, new HashMap<String, String>() {
- {
- put(persistentStoreName, persistentStoreTopicName);
- put(nonPersistentStoreName, nonPersistentStoreTopicName);
- }
- }, changelogReader);
- try {
+ final MockStateStoreSupplier.MockStateStore nonPersistentStore
+ = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ new TaskId(0, 2),
+ noPartitions,
+ false,
+ stateDirectory,
+ new HashMap<String, String>() {
+ {
+ put(persistentStoreName, persistentStoreTopicName);
+ put(nonPersistentStoreName, nonPersistentStoreTopicName);
+ }
+ },
+ changelogReader,
+ false);
+ try {
stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
-
} finally {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
@@ -128,41 +144,49 @@ public class ProcessorStateManagerTest {
@Test
public void testChangeLogOffsets() throws IOException {
final TaskId taskId = new TaskId(0, 0);
- long lastCheckpointedOffset = 10L;
- String storeName1 = "store1";
- String storeName2 = "store2";
- String storeName3 = "store3";
+ final long lastCheckpointedOffset = 10L;
+ final String storeName1 = "store1";
+ final String storeName2 = "store2";
+ final String storeName3 = "store3";
- String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
- String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
- String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
+ final String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+ final String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+ final String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
- Map<String, String> storeToChangelogTopic = new HashMap<>();
+ final Map<String, String> storeToChangelogTopic = new HashMap<>();
storeToChangelogTopic.put(storeName1, storeTopicName1);
storeToChangelogTopic.put(storeName2, storeTopicName2);
storeToChangelogTopic.put(storeName3, storeTopicName3);
- OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME));
checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
- TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
- TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
- TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
+ final TopicPartition partition1 = new TopicPartition(storeTopicName1, 0);
+ final TopicPartition partition2 = new TopicPartition(storeTopicName2, 0);
+ final TopicPartition partition3 = new TopicPartition(storeTopicName3, 1);
- MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
- MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
- MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
+ final MockStateStoreSupplier.MockStateStore store1 = new MockStateStoreSupplier.MockStateStore(storeName1, true);
+ final MockStateStoreSupplier.MockStateStore store2 = new MockStateStoreSupplier.MockStateStore(storeName2, true);
+ final MockStateStoreSupplier.MockStateStore store3 = new MockStateStoreSupplier.MockStateStore(storeName3, true);
// if there is a source partition, inherit the partition id
- Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+ final Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
+
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ sourcePartitions,
+ true, // standby
+ stateDirectory,
+ storeToChangelogTopic,
+ changelogReader,
+ false);
- ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions, true, stateDirectory, storeToChangelogTopic, changelogReader); // standby
try {
stateMgr.register(store1, true, store1.stateRestoreCallback);
stateMgr.register(store2, true, store2.stateRestoreCallback);
stateMgr.register(store3, true, store3.stateRestoreCallback);
- Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
+ final Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
assertEquals(3, changeLogOffsets.size());
assertTrue(changeLogOffsets.containsKey(partition1));
@@ -180,7 +204,14 @@ public class ProcessorStateManagerTest {
@Test
public void testGetStore() throws IOException {
final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
- final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ new TaskId(0, 1),
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
try {
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
@@ -202,12 +233,19 @@ public class ProcessorStateManagerTest {
ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
- ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, new HashMap<String, String>() {
- {
- put(persistentStoreName, persistentStoreTopicName);
- put(nonPersistentStoreName, nonPersistentStoreTopicName);
- }
- }, changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ new HashMap<String, String>() {
+ {
+ put(persistentStoreName, persistentStoreTopicName);
+ put(nonPersistentStoreName, nonPersistentStoreTopicName);
+ }
+ },
+ changelogReader,
+ false);
try {
// make sure the checkpoint file isn't deleted
assertTrue(checkpointFile.exists());
@@ -234,7 +272,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
- final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ new TaskId(0, 1),
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
assertNotNull(stateMgr.getStore(nonPersistentStoreName));
}
@@ -245,7 +290,14 @@ public class ProcessorStateManagerTest {
checkpoint.write(offsets);
final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
- final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
stateMgr.close(null);
final Map<TopicPartition, Long> read = checkpoint.read();
@@ -254,13 +306,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception {
- final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
- noPartitions,
- false,
- stateDirectory,
- Collections.singletonMap(persistentStore.name(),
- persistentStoreTopicName),
- changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
+ changelogReader,
+ false);
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
@@ -271,13 +324,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldWriteCheckpointForStandbyReplica() throws Exception {
- final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
- noPartitions,
- true,
- stateDirectory,
- Collections.singletonMap(persistentStore.name(),
- persistentStoreTopicName),
- changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ true, // standby
+ stateDirectory,
+ Collections.singletonMap(persistentStore.name(), persistentStoreTopicName),
+ changelogReader,
+ false);
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
@@ -300,14 +354,14 @@ public class ProcessorStateManagerTest {
public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1);
-
- final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
- noPartitions,
- true,
- stateDirectory,
- Collections.singletonMap(nonPersistentStoreName,
- nonPersistentStoreTopicName),
- changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ true, // standby
+ stateDirectory,
+ Collections.singletonMap(nonPersistentStoreName, nonPersistentStoreTopicName),
+ changelogReader,
+ false);
stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
@@ -318,12 +372,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception {
- final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId,
- noPartitions,
- true,
- stateDirectory,
- Collections.<String, String>emptyMap(),
- changelogReader);
+ final ProcessorStateManager stateMgr = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ true, // standby
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
@@ -344,7 +400,14 @@ public class ProcessorStateManagerTest {
final FileLock lock = channel.lock();
try {
- new ProcessorStateManager(taskId, noPartitions, false, stateDirectory, Collections.<String, String>emptyMap(), changelogReader);
+ new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
fail("Should have thrown LockException");
} catch (final LockException e) {
// pass
@@ -356,11 +419,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception {
- final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
- noPartitions,
- false,
- stateDirectory,
- Collections.<String, String>emptyMap(), changelogReader);
+ final ProcessorStateManager stateManager = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
try {
stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME, true), true, null);
@@ -372,11 +438,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception {
- final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
- noPartitions,
- false,
- stateDirectory,
- Collections.<String, String>emptyMap(), changelogReader);
+ final ProcessorStateManager stateManager = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ false);
stateManager.register(mockStateStore, false, null);
try {
@@ -391,12 +460,14 @@ public class ProcessorStateManagerTest {
@Test
public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception {
- final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
- Collections.singleton(changelogTopicPartition),
- false,
- stateDirectory,
- Collections.singletonMap(storeName, changelogTopic),
- changelogReader);
+ final ProcessorStateManager stateManager = new ProcessorStateManager(
+ taskId,
+ Collections.singleton(changelogTopicPartition),
+ false,
+ stateDirectory,
+ Collections.singletonMap(storeName, changelogTopic),
+ changelogReader,
+ false);
final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName, true) {
@Override
@@ -414,5 +485,28 @@ public class ProcessorStateManagerTest {
}
}
+ @Test
+ public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws Exception {
+ checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+ assertTrue(checkpointFile.exists());
+
+ ProcessorStateManager stateManager = null;
+ try {
+ stateManager = new ProcessorStateManager(
+ taskId,
+ noPartitions,
+ false,
+ stateDirectory,
+ Collections.<String, String>emptyMap(),
+ changelogReader,
+ true);
+
+ assertFalse(checkpointFile.exists());
+ } finally {
+ if (stateManager != null) {
+ stateManager.close(null);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3add508..b8c86f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -43,7 +43,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -112,14 +111,14 @@ public class StreamTaskTest {
private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
private final TaskId taskId00 = new TaskId(0, 0);
private final MockTime time = new MockTime();
- private File baseDir;
+ private File baseDir = TestUtils.tempDirectory();
private StateDirectory stateDirectory;
private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId");
- private final ThreadCache testCache = new ThreadCache("testCache", 0, streamsMetrics);
private StreamsConfig config;
+ private StreamsConfig eosConfig;
private StreamTask task;
- private StreamsConfig createConfig(final File baseDir) throws Exception {
+ private StreamsConfig createConfig(final boolean enableEoS) throws Exception {
return new StreamsConfig(new Properties() {
{
setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
@@ -127,6 +126,9 @@ public class StreamTaskTest {
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+ if (enableEoS) {
+ setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+ }
}
});
}
@@ -139,23 +141,22 @@ public class StreamTaskTest {
consumer.assign(Arrays.asList(partition1, partition2));
source1.addChild(processor);
source2.addChild(processor);
- baseDir = TestUtils.tempDirectory();
- config = createConfig(baseDir);
+ config = createConfig(false);
+ eosConfig = createConfig(true);
stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
- changelogReader, config, streamsMetrics, stateDirectory, null, time, recordCollector);
+ changelogReader, config, streamsMetrics, stateDirectory, null, time, producer);
}
@After
public void cleanup() throws IOException {
- if (task != null) {
- try {
- task.close();
- } catch (final Exception e) {
- // ignore exceptions
+ try {
+ if (task != null) {
+ task.close(true);
}
+ } finally {
+ Utils.delete(baseDir);
}
- Utils.delete(baseDir);
}
@SuppressWarnings("unchecked")
@@ -354,8 +355,12 @@ public class StreamTaskTest {
};
final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode);
- final Map<String, SourceNode> sourceNodes
- = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+ final Map<String, SourceNode> sourceNodes = new HashMap() {
+ {
+ put(topic1[0], processorNode);
+ put(topic2[0], processorNode);
+ }
+ };
final ProcessorTopology topology = new ProcessorTopology(processorNodes,
sourceNodes,
Collections.<String, SinkNode>emptyMap(),
@@ -363,10 +368,10 @@ public class StreamTaskTest {
Collections.<String, String>emptyMap(),
Collections.<StateStore>emptyList());
- task.close();
+ task.close(true);
- task = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
- topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector);
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
+ streamsMetrics, stateDirectory, null, time, producer);
final int offset = 20;
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -415,16 +420,20 @@ public class StreamTaskTest {
@Test
public void shouldFlushRecordCollectorOnFlushState() throws Exception {
final AtomicBoolean flushed = new AtomicBoolean(false);
- final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
+ final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+ final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+ changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
+
@Override
- public void flush() {
- flushed.set(true);
+ RecordCollector createRecordCollector() {
+ return new NoOpRecordCollector() {
+ @Override
+ public void flush() {
+ flushed.set(true);
+ }
+ };
}
};
- final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
- final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
- changelogReader, createConfig(baseDir), streamsMetrics,
- stateDirectory, testCache, time, recordCollector);
streamTask.flushState();
assertTrue(flushed.get());
}
@@ -458,13 +467,6 @@ public class StreamTaskTest {
Collections.<StateStore>emptyList());
final TopicPartition partition = new TopicPartition(changelogTopic, 0);
- final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
- @Override
- public Map<TopicPartition, Long> offsets() {
-
- return Collections.singletonMap(partition, 543L);
- }
- };
restoreStateConsumer.updatePartitions(changelogTopic,
Collections.singletonList(
@@ -472,22 +474,29 @@ public class StreamTaskTest {
restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L));
- final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
- final TaskId taskId = new TaskId(0, 0);
- final MockTime time = new MockTime();
- final StreamsConfig config = createConfig(baseDir);
- final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer,
- changelogReader, config, streamsMetrics,
- stateDirectory, new ThreadCache("testCache", 0, streamsMetrics),
- time, recordCollector);
+ final long offset = 543L;
+ final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
+ changelogReader, config, streamsMetrics, stateDirectory, null, time, producer) {
+
+ @Override
+ RecordCollector createRecordCollector() {
+ return new NoOpRecordCollector() {
+ @Override
+ public Map<TopicPartition, Long> offsets() {
+
+ return Collections.singletonMap(partition, offset);
+ }
+ };
+ }
+ };
time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
streamTask.commit();
- final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId),
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME));
- assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L)));
+ assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, offset + 1)));
}
@Test
@@ -528,45 +537,161 @@ public class StreamTaskTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseTopology() throws Exception {
- task.close();
+ public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
+ task.close(true);
task = createTaskThatThrowsExceptionOnClose();
try {
- task.close();
+ task.close(true);
fail("should have thrown runtime exception");
} catch (final RuntimeException e) {
- // ok
- }
- }
-
- @Test
- public void shouldCloseAllProcessorNodesWhenExceptionsRaised() throws Exception {
- task.close();
- task = createTaskThatThrowsExceptionOnClose();
- try {
- task.close();
- } catch (final RuntimeException e) {
- // expected
+ task = null;
}
assertTrue(processor.closed);
assertTrue(source1.closed);
assertTrue(source2.closed);
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldCloseProducerWhenExactlyOneEnabled() {
- final Map properties = config.values();
- properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
- final StreamsConfig config = new StreamsConfig(properties);
+ public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ assertTrue(producer.transactionInitialized());
+ assertTrue(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ config, streamsMetrics, stateDirectory, null, time, producer);
- task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
- changelogReader, config, streamsMetrics, stateDirectory, null, time, new RecordCollectorImpl(producer, "taskId"));
+ assertFalse(producer.transactionInitialized());
+ assertFalse(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.addRecords(partition1, Collections.singletonList(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+ task.process();
- task.close();
+ task.suspend();
+ assertTrue(producer.sentOffsets());
+ assertTrue(producer.transactionCommitted());
+ assertFalse(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ config, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.addRecords(partition1, Collections.singletonList(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+ task.process();
+
+ task.suspend();
+ assertFalse(producer.sentOffsets());
+ assertFalse(producer.transactionCommitted());
+ assertFalse(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.addRecords(partition1, Collections.singletonList(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+ task.process();
+ task.suspend();
+ task.resume();
+ assertTrue(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ config, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.addRecords(partition1, Collections.singletonList(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+ task.process();
+ task.suspend();
+
+ task.resume();
+ assertFalse(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.addRecords(partition1, Collections.singletonList(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+ task.process();
+
+ task.commit();
+ assertTrue(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ config, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.addRecords(partition1, Collections.singletonList(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
+ task.process();
+
+ task.commit();
+ assertFalse(producer.transactionInFlight());
+ }
+
+ @Test
+ public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.close(false);
+ task = null;
+ assertTrue(producer.transactionAborted());
+ }
+
+ @Test
+ public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ config, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.close(false);
+ assertFalse(producer.transactionAborted());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception {
+ final MockProducer producer = new MockProducer();
+
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+ changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ task.close(true);
+ task = null;
assertTrue(producer.closed());
}
@@ -579,8 +704,12 @@ public class StreamTaskTest {
}
};
final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor, source1, source2);
- final Map<String, SourceNode> sourceNodes
- = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+ final Map<String, SourceNode> sourceNodes = new HashMap() {
+ {
+ put(topic1[0], processorNode);
+ put(topic2[0], processorNode);
+ }
+ };
final ProcessorTopology topology = new ProcessorTopology(processorNodes,
sourceNodes,
Collections.<String, SinkNode>emptyMap(),
@@ -588,34 +717,12 @@ public class StreamTaskTest {
Collections.<String, String>emptyMap(),
Collections.<StateStore>emptyList());
- return new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
- topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, testCache, time, recordCollector);
+ return new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config,
+ streamsMetrics, stateDirectory, null, time, producer);
}
private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
- private final static class MockedProducer extends MockProducer {
- private final AtomicBoolean flushed;
- boolean closed = false;
-
- MockedProducer(final AtomicBoolean flushed) {
- super(false, null, null);
- this.flushed = flushed;
- }
-
- @Override
- public void flush() {
- if (flushed != null) {
- flushed.set(true);
- }
- }
-
- @Override
- public void close() {
- closed = true;
- }
- }
-
}