You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 22:14:03 UTC
[kafka] branch trunk updated: KAFKA-6860: Fix NPE in Kafka Streams
with EOS enabled (#5187)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff96d57 KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled (#5187)
ff96d57 is described below
commit ff96d574371811c75f4f454847f67508d1de98c0
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jun 13 15:13:55 2018 -0700
KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled (#5187)
Reviewers: John Roesler <jo...@confluent.io>, Ko Byoung Kwon, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../processor/internals/AbstractStateManager.java | 22 ++++---
.../internals/GlobalStateManagerImpl.java | 23 ++++---
.../processor/internals/ProcessorStateManager.java | 13 ++--
.../internals/ProcessorStateManagerTest.java | 71 ++++++++++++----------
4 files changed, 67 insertions(+), 62 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index b270e03..66ddec9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -36,17 +36,18 @@ abstract class AbstractStateManager implements StateManager {
static final String CHECKPOINT_FILE_NAME = ".checkpoint";
final File baseDir;
- final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
-
+ private final boolean eosEnabled;
OffsetCheckpoint checkpoint;
+ final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
final Map<String, StateStore> stores = new LinkedHashMap<>();
final Map<String, StateStore> globalStores = new LinkedHashMap<>();
- AbstractStateManager(final File baseDir) {
+ AbstractStateManager(final File baseDir,
+ final boolean eosEnabled) {
this.baseDir = baseDir;
+ this.eosEnabled = eosEnabled;
this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
-
}
public void reinitializeStateStoresForPartitions(final Logger log,
@@ -62,11 +63,14 @@ abstract class AbstractStateManager implements StateManager {
checkpointableOffsets.remove(topicPartition);
storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
}
- try {
- checkpoint.write(checkpointableOffsets);
- } catch (final IOException fatalException) {
- log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
- throw new StreamsException("Failed to reinitialize global store.", fatalException);
+
+ if (!eosEnabled) {
+ try {
+ checkpoint.write(checkpointableOffsets);
+ } catch (final IOException fatalException) {
+ log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
+ throw new StreamsException("Failed to reinitialize global store.", fatalException);
+ }
}
for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 79088d9..78c4a36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -69,7 +69,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
- super(stateDirectory.globalStateDir());
+ super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
this.log = logContext.logger(GlobalStateManagerImpl.class);
this.topology = topology;
@@ -92,16 +92,16 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
if (!stateDirectory.lockGlobalState()) {
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
}
- } catch (IOException e) {
+ } catch (final IOException e) {
throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
}
try {
this.checkpointableOffsets.putAll(checkpoint.read());
- } catch (IOException e) {
+ } catch (final IOException e) {
try {
stateDirectory.unlockGlobalState();
- } catch (IOException e1) {
+ } catch (final IOException e1) {
log.error("Failed to unlock the global state directory", e);
}
throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
@@ -232,7 +232,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
}
final List<TopicPartition> topicPartitions = new ArrayList<>();
- for (PartitionInfo partition : partitionInfos) {
+ for (final PartitionInfo partition : partitionInfos) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
return topicPartitions;
@@ -253,8 +253,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
long offset = globalConsumer.position(topicPartition);
final Long highWatermark = highWatermarks.get(topicPartition);
- BatchingStateRestoreCallback
- stateRestoreAdapter =
+ final BatchingStateRestoreCallback stateRestoreAdapter =
(BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
BatchingStateRestoreCallback)
? stateRestoreCallback
@@ -267,7 +266,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
try {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
- for (ConsumerRecord<byte[], byte[]> record : records) {
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
restoreRecords.add(KeyValue.pair(record.key(), record.value()));
}
@@ -294,11 +293,11 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
@Override
public void flush() {
log.debug("Flushing all global globalStores registered in the state manager");
- for (StateStore store : this.globalStores.values()) {
+ for (final StateStore store : this.globalStores.values()) {
try {
log.trace("Flushing global store={}", store.name());
store.flush();
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
}
}
@@ -316,7 +315,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
log.debug("Closing global storage engine {}", entry.getKey());
try {
entry.getValue().close();
- } catch (Exception e) {
+ } catch (final Exception e) {
log.error("Failed to close global state store {}", entry.getKey(), e);
closeFailed.append("Failed to close global state store:")
.append(entry.getKey())
@@ -341,7 +340,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
if (!checkpointableOffsets.isEmpty()) {
try {
checkpoint.write(checkpointableOffsets);
- } catch (IOException e) {
+ } catch (final IOException e) {
log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 054333b..afb56c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -67,7 +67,7 @@ public class ProcessorStateManager extends AbstractStateManager {
final ChangelogReader changelogReader,
final boolean eosEnabled,
final LogContext logContext) throws IOException {
- super(stateDirectory.directoryForTask(taskId));
+ super(stateDirectory.directoryForTask(taskId), eosEnabled);
this.log = logContext.logger(ProcessorStateManager.class);
this.taskId = taskId;
@@ -81,12 +81,11 @@ public class ProcessorStateManager extends AbstractStateManager {
offsetLimits = new HashMap<>();
standbyRestoredOffsets = new HashMap<>();
this.isStandby = isStandby;
- restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
+ restoreCallbacks = isStandby ? new HashMap<>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;
// load the checkpoint information
checkpointableOffsets.putAll(checkpoint.read());
-
if (eosEnabled) {
// delete the checkpoint file after finish loading its stored offsets
checkpoint.delete();
@@ -169,11 +168,7 @@ public class ProcessorStateManager extends AbstractStateManager {
final int partition = getPartition(topicName);
final TopicPartition storePartition = new TopicPartition(topicName, partition);
- if (checkpointableOffsets.containsKey(storePartition)) {
- partitionsAndOffsets.put(storePartition, checkpointableOffsets.get(storePartition));
- } else {
- partitionsAndOffsets.put(storePartition, -1L);
- }
+ partitionsAndOffsets.put(storePartition, checkpointableOffsets.getOrDefault(storePartition, -1L));
}
return partitionsAndOffsets;
}
@@ -340,7 +335,7 @@ public class ProcessorStateManager extends AbstractStateManager {
return globalStores.get(name);
}
- private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback callback) {
+ private BatchingStateRestoreCallback getBatchingRestoreCallback(final StateRestoreCallback callback) {
if (callback instanceof BatchingStateRestoreCallback) {
return (BatchingStateRestoreCallback) callback;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 6a20cd9..1b03cd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -123,7 +123,7 @@ public class ProcessorStateManagerTest {
assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
} finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.close(Collections.emptyMap());
}
}
@@ -141,7 +141,7 @@ public class ProcessorStateManagerTest {
assertThat(persistentStore.keys.size(), is(1));
assertTrue(persistentStore.keys.contains(intKey));
} finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.close(Collections.emptyMap());
}
}
@@ -169,7 +169,7 @@ public class ProcessorStateManagerTest {
stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
} finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.close(Collections.emptyMap());
}
}
@@ -196,7 +196,7 @@ public class ProcessorStateManagerTest {
stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
} finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.close(Collections.emptyMap());
}
}
@@ -257,7 +257,7 @@ public class ProcessorStateManagerTest {
assertEquals(-1L, (long) changeLogOffsets.get(partition3));
} finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.close(Collections.emptyMap());
}
}
@@ -269,7 +269,7 @@ public class ProcessorStateManagerTest {
noPartitions,
false,
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
false,
logContext);
@@ -280,13 +280,13 @@ public class ProcessorStateManagerTest {
assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
} finally {
- stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.close(Collections.emptyMap());
}
}
@Test
public void testFlushAndClose() throws IOException {
- checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+ checkpoint.write(Collections.emptyMap());
// set up ack'ed offsets
final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
@@ -339,7 +339,7 @@ public class ProcessorStateManagerTest {
noPartitions,
false,
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
false,
logContext);
@@ -358,7 +358,7 @@ public class ProcessorStateManagerTest {
noPartitions,
false,
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
false,
logContext);
@@ -408,7 +408,7 @@ public class ProcessorStateManagerTest {
bytes,
bytes)));
- stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+ stateMgr.checkpoint(Collections.emptyMap());
final Map<TopicPartition, Long> read = checkpoint.read();
assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
@@ -433,7 +433,7 @@ public class ProcessorStateManagerTest {
stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
final Map<TopicPartition, Long> read = checkpoint.read();
- assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+ assertThat(read, equalTo(Collections.emptyMap()));
}
@Test
@@ -443,7 +443,7 @@ public class ProcessorStateManagerTest {
noPartitions,
true, // standby
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
false,
logContext);
@@ -453,10 +453,9 @@ public class ProcessorStateManagerTest {
stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L));
final Map<TopicPartition, Long> read = checkpoint.read();
- assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+ assertThat(read, equalTo(Collections.emptyMap()));
}
-
@Test
public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws IOException {
final ProcessorStateManager stateManager = new ProcessorStateManager(
@@ -464,7 +463,7 @@ public class ProcessorStateManagerTest {
noPartitions,
false,
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
false,
logContext);
@@ -484,7 +483,7 @@ public class ProcessorStateManagerTest {
noPartitions,
false,
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
false,
logContext);
@@ -551,7 +550,7 @@ public class ProcessorStateManagerTest {
stateManager.register(stateStore, stateStore.stateRestoreCallback);
try {
- stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+ stateManager.close(Collections.emptyMap());
fail("Should throw ProcessorStateException if store close throws exception");
} catch (final ProcessorStateException e) {
// pass
@@ -623,7 +622,7 @@ public class ProcessorStateManagerTest {
stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
try {
- stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+ stateManager.close(Collections.emptyMap());
} catch (final ProcessorStateException expected) { /* ignode */ }
Assert.assertTrue(closedStore.get());
}
@@ -640,7 +639,7 @@ public class ProcessorStateManagerTest {
noPartitions,
false,
stateDirectory,
- Collections.<String, String>emptyMap(),
+ Collections.emptyMap(),
changelogReader,
true,
logContext);
@@ -653,28 +652,36 @@ public class ProcessorStateManagerTest {
}
}
- @SuppressWarnings("unchecked")
@Test
- public void shouldSuccessfullyReInitializeStateStores() throws IOException {
+ public void shouldSuccessfullyReInitializeStateStoresWithEosDisable() throws Exception {
+ shouldSuccessfullyReInitializeStateStores(false);
+ }
+
+ @Test
+ public void shouldSuccessfullyReInitializeStateStoresWithEosEnable() throws Exception {
+ shouldSuccessfullyReInitializeStateStores(true);
+ }
+
+ private void shouldSuccessfullyReInitializeStateStores(final boolean eosEnabled) throws Exception {
final String store2Name = "store2";
final String store2Changelog = "store2-changelog";
final TopicPartition store2Partition = new TopicPartition(store2Changelog, 0);
final List<TopicPartition> changelogPartitions = Arrays.asList(changelogTopicPartition, store2Partition);
- Map<String, String> storeToChangelog = new HashMap() {
+ final Map<String, String> storeToChangelog = new HashMap<String, String>() {
{
put(storeName, changelogTopic);
put(store2Name, store2Changelog);
}
};
final ProcessorStateManager stateManager = new ProcessorStateManager(
- taskId,
- changelogPartitions,
- false,
- stateDirectory,
- storeToChangelog,
- changelogReader,
- false,
- logContext);
+ taskId,
+ changelogPartitions,
+ false,
+ stateDirectory,
+ storeToChangelog,
+ changelogReader,
+ eosEnabled,
+ logContext);
final MockStateStore stateStore = new MockStateStore(storeName, true);
final MockStateStore stateStore2 = new MockStateStore(store2Name, true);
@@ -696,7 +703,7 @@ public class ProcessorStateManagerTest {
assertTrue(stateStore2.initialized);
}
- private ProcessorStateManager getStandByStateManager(TaskId taskId) throws IOException {
+ private ProcessorStateManager getStandByStateManager(final TaskId taskId) throws IOException {
return new ProcessorStateManager(
taskId,
noPartitions,
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.