You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/26 23:05:36 UTC

[kafka] branch 1.0 updated: KAFKA-7534: Error in flush calling close may prevent underlying store from closing (#5833)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 706208c  KAFKA-7534: Error in flush calling close may prevent underlying store  from closing (#5833)
706208c is described below

commit 706208c2673359981f34f569e31d891b69aa99e5
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Oct 26 19:00:33 2018 -0400

    KAFKA-7534: Error in flush calling close may prevent underlying store  from closing (#5833)
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../state/internals/CachingKeyValueStore.java      | 12 +++++++++---
 .../state/internals/CachingKeyValueStoreTest.java  | 22 ++++++++++++++++++++--
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index f0669a4..1ef61ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -115,9 +115,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public void close() {
-        flush();
-        underlying.close();
-        cache.close(cacheName);
+        try {
+            flush();
+        } finally {
+            try {
+                underlying.close();
+            } finally {
+                cache.close(cacheName);
+            }
+        }
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index 97a2fbf..ad01fea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorContext;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -109,6 +110,22 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     }
 
     @Test
+    public void shouldCloseAfterErrorWithFlush() {
+        try {
+            cache = EasyMock.niceMock(ThreadCache.class);
+            context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
+            context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null));
+            store.init(context, null);
+            cache.flush("0_0-store");
+            EasyMock.expectLastCall().andThrow(new NullPointerException("Simulating an error on flush"));
+            EasyMock.replay(cache);
+            store.close();
+        } catch (final NullPointerException npe) {
+            assertFalse(underlyingStore.isOpen());
+        }
+    }
+
+    @Test
     public void shouldPutGetToFromCache() {
         store.put(bytesKey("key"), bytesValue("value"));
         store.put(bytesKey("key2"), bytesValue("value2"));
@@ -279,7 +296,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         try {
             store.putAll(entries);
             fail("Should have thrown NullPointerException while putAll null key");
-        } catch (NullPointerException e) { }
+        } catch (final NullPointerException e) {
+        }
     }
 
     @Test
@@ -331,4 +349,4 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
             forwarded.put(key, new Change<>(newValue, oldValue));
         }
     }
-}
\ No newline at end of file
+}