You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 22:14:03 UTC

[kafka] branch trunk updated: KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled (#5187)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff96d57  KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled (#5187)
ff96d57 is described below

commit ff96d574371811c75f4f454847f67508d1de98c0
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Wed Jun 13 15:13:55 2018 -0700

    KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled (#5187)
    
    Reviewers: John Roesler <jo...@confluent.io>, Ko Byoung Kwon, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../processor/internals/AbstractStateManager.java  | 22 ++++---
 .../internals/GlobalStateManagerImpl.java          | 23 ++++---
 .../processor/internals/ProcessorStateManager.java | 13 ++--
 .../internals/ProcessorStateManagerTest.java       | 71 ++++++++++++----------
 4 files changed, 67 insertions(+), 62 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index b270e03..66ddec9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -36,17 +36,18 @@ abstract class AbstractStateManager implements StateManager {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
     final File baseDir;
-    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
-
+    private final boolean eosEnabled;
     OffsetCheckpoint checkpoint;
 
+    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
     final Map<String, StateStore> stores = new LinkedHashMap<>();
     final Map<String, StateStore> globalStores = new LinkedHashMap<>();
 
-    AbstractStateManager(final File baseDir) {
+    AbstractStateManager(final File baseDir,
+                         final boolean eosEnabled) {
         this.baseDir = baseDir;
+        this.eosEnabled = eosEnabled;
         this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
-
     }
 
     public void reinitializeStateStoresForPartitions(final Logger log,
@@ -62,11 +63,14 @@ abstract class AbstractStateManager implements StateManager {
             checkpointableOffsets.remove(topicPartition);
             storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
         }
-        try {
-            checkpoint.write(checkpointableOffsets);
-        } catch (final IOException fatalException) {
-            log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
-            throw new StreamsException("Failed to reinitialize global store.", fatalException);
+
+        if (!eosEnabled) {
+            try {
+                checkpoint.write(checkpointableOffsets);
+            } catch (final IOException fatalException) {
+                log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stateStores, fatalException);
+                throw new StreamsException("Failed to reinitialize global store.", fatalException);
+            }
         }
 
         for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 79088d9..78c4a36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -69,7 +69,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
                                   final StateDirectory stateDirectory,
                                   final StateRestoreListener stateRestoreListener,
                                   final StreamsConfig config) {
-        super(stateDirectory.globalStateDir());
+        super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
@@ -92,16 +92,16 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
             if (!stateDirectory.lockGlobalState()) {
                 throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
             }
-        } catch (IOException e) {
+        } catch (final IOException e) {
             throw new LockException(String.format("Failed to lock the global state directory: %s", baseDir));
         }
 
         try {
             this.checkpointableOffsets.putAll(checkpoint.read());
-        } catch (IOException e) {
+        } catch (final IOException e) {
             try {
                 stateDirectory.unlockGlobalState();
-            } catch (IOException e1) {
+            } catch (final IOException e1) {
                 log.error("Failed to unlock the global state directory", e);
             }
             throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
@@ -232,7 +232,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
         }
 
         final List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (PartitionInfo partition : partitionInfos) {
+        for (final PartitionInfo partition : partitionInfos) {
             topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
         }
         return topicPartitions;
@@ -253,8 +253,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            BatchingStateRestoreCallback
-                stateRestoreAdapter =
+            final BatchingStateRestoreCallback stateRestoreAdapter =
                 (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
                                                      BatchingStateRestoreCallback)
                                                 ? stateRestoreCallback
@@ -267,7 +266,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
                 try {
                     final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
                     final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
-                    for (ConsumerRecord<byte[], byte[]> record : records) {
+                    for (final ConsumerRecord<byte[], byte[]> record : records) {
                         if (record.key() != null) {
                             restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                         }
@@ -294,11 +293,11 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
     @Override
     public void flush() {
         log.debug("Flushing all global globalStores registered in the state manager");
-        for (StateStore store : this.globalStores.values()) {
+        for (final StateStore store : this.globalStores.values()) {
             try {
                 log.trace("Flushing global store={}", store.name());
                 store.flush();
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
             }
         }
@@ -316,7 +315,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
                 log.debug("Closing global storage engine {}", entry.getKey());
                 try {
                     entry.getValue().close();
-                } catch (Exception e) {
+                } catch (final Exception e) {
                     log.error("Failed to close global state store {}", entry.getKey(), e);
                     closeFailed.append("Failed to close global state store:")
                             .append(entry.getKey())
@@ -341,7 +340,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
         if (!checkpointableOffsets.isEmpty()) {
             try {
                 checkpoint.write(checkpointableOffsets);
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e);
             }
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 054333b..afb56c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -67,7 +67,7 @@ public class ProcessorStateManager extends AbstractStateManager {
                                  final ChangelogReader changelogReader,
                                  final boolean eosEnabled,
                                  final LogContext logContext) throws IOException {
-        super(stateDirectory.directoryForTask(taskId));
+        super(stateDirectory.directoryForTask(taskId), eosEnabled);
 
         this.log = logContext.logger(ProcessorStateManager.class);
         this.taskId = taskId;
@@ -81,12 +81,11 @@ public class ProcessorStateManager extends AbstractStateManager {
         offsetLimits = new HashMap<>();
         standbyRestoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
-        restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
+        restoreCallbacks = isStandby ? new HashMap<>() : null;
         this.storeToChangelogTopic = storeToChangelogTopic;
 
         // load the checkpoint information
         checkpointableOffsets.putAll(checkpoint.read());
-
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored offsets
             checkpoint.delete();
@@ -169,11 +168,7 @@ public class ProcessorStateManager extends AbstractStateManager {
             final int partition = getPartition(topicName);
             final TopicPartition storePartition = new TopicPartition(topicName, partition);
 
-            if (checkpointableOffsets.containsKey(storePartition)) {
-                partitionsAndOffsets.put(storePartition, checkpointableOffsets.get(storePartition));
-            } else {
-                partitionsAndOffsets.put(storePartition, -1L);
-            }
+            partitionsAndOffsets.put(storePartition, checkpointableOffsets.getOrDefault(storePartition, -1L));
         }
         return partitionsAndOffsets;
     }
@@ -340,7 +335,7 @@ public class ProcessorStateManager extends AbstractStateManager {
         return globalStores.get(name);
     }
 
-    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback callback) {
+    private BatchingStateRestoreCallback getBatchingRestoreCallback(final StateRestoreCallback callback) {
         if (callback instanceof BatchingStateRestoreCallback) {
             return (BatchingStateRestoreCallback) callback;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 6a20cd9..1b03cd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -123,7 +123,7 @@ public class ProcessorStateManagerTest {
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
             assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -141,7 +141,7 @@ public class ProcessorStateManagerTest {
             assertThat(persistentStore.keys.size(), is(1));
             assertTrue(persistentStore.keys.contains(intKey));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -169,7 +169,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -196,7 +196,7 @@ public class ProcessorStateManagerTest {
             stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName, 2)));
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -257,7 +257,7 @@ public class ProcessorStateManagerTest {
             assertEquals(-1L, (long) changeLogOffsets.get(partition3));
 
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
@@ -269,7 +269,7 @@ public class ProcessorStateManagerTest {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -280,13 +280,13 @@ public class ProcessorStateManagerTest {
             assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
 
         } finally {
-            stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
+            stateMgr.close(Collections.emptyMap());
         }
     }
 
     @Test
     public void testFlushAndClose() throws IOException {
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.emptyMap());
 
         // set up ack'ed offsets
         final HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
@@ -339,7 +339,7 @@ public class ProcessorStateManagerTest {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -358,7 +358,7 @@ public class ProcessorStateManagerTest {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -408,7 +408,7 @@ public class ProcessorStateManagerTest {
                                                                   bytes,
                                                                   bytes)));
 
-        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+        stateMgr.checkpoint(Collections.emptyMap());
 
         final Map<TopicPartition, Long> read = checkpoint.read();
         assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L)));
@@ -433,7 +433,7 @@ public class ProcessorStateManagerTest {
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
 
         final Map<TopicPartition, Long> read = checkpoint.read();
-        assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+        assertThat(read, equalTo(Collections.emptyMap()));
     }
 
     @Test
@@ -443,7 +443,7 @@ public class ProcessorStateManagerTest {
             noPartitions,
             true, // standby
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -453,10 +453,9 @@ public class ProcessorStateManagerTest {
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L));
 
         final Map<TopicPartition, Long> read = checkpoint.read();
-        assertThat(read, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+        assertThat(read, equalTo(Collections.emptyMap()));
     }
 
-
     @Test
     public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
@@ -464,7 +463,7 @@ public class ProcessorStateManagerTest {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -484,7 +483,7 @@ public class ProcessorStateManagerTest {
             noPartitions,
             false,
             stateDirectory,
-            Collections.<String, String>emptyMap(),
+            Collections.emptyMap(),
             changelogReader,
             false,
             logContext);
@@ -551,7 +550,7 @@ public class ProcessorStateManagerTest {
         stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+            stateManager.close(Collections.emptyMap());
             fail("Should throw ProcessorStateException if store close throws exception");
         } catch (final ProcessorStateException e) {
             // pass
@@ -623,7 +622,7 @@ public class ProcessorStateManagerTest {
         stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
-            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+            stateManager.close(Collections.emptyMap());
         } catch (final ProcessorStateException expected) { /* ignode */ }
         Assert.assertTrue(closedStore.get());
     }
@@ -640,7 +639,7 @@ public class ProcessorStateManagerTest {
                 noPartitions,
                 false,
                 stateDirectory,
-                Collections.<String, String>emptyMap(),
+                Collections.emptyMap(),
                 changelogReader,
                 true,
                 logContext);
@@ -653,28 +652,36 @@ public class ProcessorStateManagerTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldSuccessfullyReInitializeStateStores() throws IOException {
+    public void shouldSuccessfullyReInitializeStateStoresWithEosDisable() throws Exception {
+        shouldSuccessfullyReInitializeStateStores(false);
+    }
+
+    @Test
+    public void shouldSuccessfullyReInitializeStateStoresWithEosEnable() throws Exception {
+        shouldSuccessfullyReInitializeStateStores(true);
+    }
+
+    private void shouldSuccessfullyReInitializeStateStores(final boolean eosEnabled) throws Exception {
         final String store2Name = "store2";
         final String store2Changelog = "store2-changelog";
         final TopicPartition store2Partition = new TopicPartition(store2Changelog, 0);
         final List<TopicPartition> changelogPartitions = Arrays.asList(changelogTopicPartition, store2Partition);
-        Map<String, String> storeToChangelog = new HashMap() {
+        final Map<String, String> storeToChangelog = new HashMap<String, String>() {
             {
                 put(storeName, changelogTopic);
                 put(store2Name, store2Changelog);
             }
         };
         final ProcessorStateManager stateManager = new ProcessorStateManager(
-                taskId,
-                changelogPartitions,
-                false,
-                stateDirectory,
-                storeToChangelog,
-                changelogReader,
-                false,
-                logContext);
+            taskId,
+            changelogPartitions,
+            false,
+            stateDirectory,
+            storeToChangelog,
+            changelogReader,
+            eosEnabled,
+            logContext);
 
         final MockStateStore stateStore = new MockStateStore(storeName, true);
         final MockStateStore stateStore2 = new MockStateStore(store2Name, true);
@@ -696,7 +703,7 @@ public class ProcessorStateManagerTest {
         assertTrue(stateStore2.initialized);
     }
 
-    private ProcessorStateManager getStandByStateManager(TaskId taskId) throws IOException {
+    private ProcessorStateManager getStandByStateManager(final TaskId taskId) throws IOException {
         return new ProcessorStateManager(
             taskId,
             noPartitions,

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.