You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/26 23:05:36 UTC
[kafka] branch 1.0 updated: KAFKA-7534: Error in flush calling
close may prevent underlying store from closing (#5833)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 706208c KAFKA-7534: Error in flush calling close may prevent underlying store from closing (#5833)
706208c is described below
commit 706208c2673359981f34f569e31d891b69aa99e5
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Oct 26 19:00:33 2018 -0400
KAFKA-7534: Error in flush calling close may prevent underlying store from closing (#5833)
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../state/internals/CachingKeyValueStore.java | 12 +++++++++---
.../state/internals/CachingKeyValueStoreTest.java | 22 ++++++++++++++++++++--
2 files changed, 29 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f0669a4..1ef61ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -115,9 +115,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public void close() {
- flush();
- underlying.close();
- cache.close(cacheName);
+ try {
+ flush();
+ } finally {
+ try {
+ underlying.close();
+ } finally {
+ cache.close(cacheName);
+ }
+ }
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 97a2fbf..ad01fea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorContext;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -109,6 +110,22 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
}
@Test
+ public void shouldCloseAfterErrorWithFlush() {
+ try {
+ cache = EasyMock.niceMock(ThreadCache.class);
+ context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null));
+ store.init(context, null);
+ cache.flush("0_0-store");
+ EasyMock.expectLastCall().andThrow(new NullPointerException("Simulating an error on flush"));
+ EasyMock.replay(cache);
+ store.close();
+ } catch (final NullPointerException npe) {
+ assertFalse(underlyingStore.isOpen());
+ }
+ }
+
+ @Test
public void shouldPutGetToFromCache() {
store.put(bytesKey("key"), bytesValue("value"));
store.put(bytesKey("key2"), bytesValue("value2"));
@@ -279,7 +296,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
try {
store.putAll(entries);
fail("Should have thrown NullPointerException while putAll null key");
- } catch (NullPointerException e) { }
+ } catch (final NullPointerException e) {
+ }
}
@Test
@@ -331,4 +349,4 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
forwarded.put(key, new Change<>(newValue, oldValue));
}
}
-}
\ No newline at end of file
+}