You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/26 23:01:49 UTC
[kafka] branch 2.1 updated: KAFKA-7534: Error in flush calling
close may prevent underlying store from closing (#5833)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 6f7a3a1 KAFKA-7534: Error in flush calling close may prevent underlying store from closing (#5833)
6f7a3a1 is described below
commit 6f7a3a105de88ec3c115bda34b2399f19e02de70
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Oct 26 19:00:33 2018 -0400
KAFKA-7534: Error in flush calling close may prevent underlying store from closing (#5833)
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../state/internals/CachingKeyValueStore.java | 12 +++++++++---
.../state/internals/CachingKeyValueStoreTest.java | 20 +++++++++++++++++++-
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index a6a24ea..8d9b207 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -129,9 +129,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
@Override
public void close() {
- flush();
- underlying.close();
- cache.close(cacheName);
+ try {
+ flush();
+ } finally {
+ try {
+ underlying.close();
+ } finally {
+ cache.close(cacheName);
+ }
+ }
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index ae6bded..403478e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.InternalMockProcessorContext;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -104,6 +105,22 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
}
@Test
+ public void shouldCloseAfterErrorWithFlush() {
+ try {
+ cache = EasyMock.niceMock(ThreadCache.class);
+ context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
+ context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null));
+ store.init(context, null);
+ cache.flush("0_0-store");
+ EasyMock.expectLastCall().andThrow(new NullPointerException("Simulating an error on flush"));
+ EasyMock.replay(cache);
+ store.close();
+ } catch (final NullPointerException npe) {
+ assertFalse(underlyingStore.isOpen());
+ }
+ }
+
+ @Test
public void shouldPutGetToFromCache() {
store.put(bytesKey("key"), bytesValue("value"));
store.put(bytesKey("key2"), bytesValue("value2"));
@@ -274,7 +291,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
try {
store.putAll(entries);
fail("Should have thrown NullPointerException while putAll null key");
- } catch (final NullPointerException e) { }
+ } catch (final NullPointerException e) {
+ }
}
@Test