You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/26 23:01:49 UTC

[kafka] branch 2.1 updated: KAFKA-7534: Error in flush calling close may prevent underlying store from closing (#5833)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 6f7a3a1  KAFKA-7534: Error in flush calling close may prevent underlying store  from closing (#5833)
6f7a3a1 is described below

commit 6f7a3a105de88ec3c115bda34b2399f19e02de70
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Oct 26 19:00:33 2018 -0400

    KAFKA-7534: Error in flush calling close may prevent underlying store  from closing (#5833)
    
    Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../state/internals/CachingKeyValueStore.java        | 12 +++++++++---
 .../state/internals/CachingKeyValueStoreTest.java    | 20 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index a6a24ea..8d9b207 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -129,9 +129,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
     @Override
     public void close() {
-        flush();
-        underlying.close();
-        cache.close(cacheName);
+        try {
+            flush();
+        } finally {
+            try {
+                underlying.close();
+            } finally {
+                cache.close(cacheName);
+            }
+        }
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index ae6bded..403478e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.InternalMockProcessorContext;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -104,6 +105,22 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
     }
 
     @Test
+    public void shouldCloseAfterErrorWithFlush() {
+        try {
+            cache = EasyMock.niceMock(ThreadCache.class);
+            context = new InternalMockProcessorContext(null, null, null, (RecordCollector) null, cache);
+            context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic, null));
+            store.init(context, null);
+            cache.flush("0_0-store");
+            EasyMock.expectLastCall().andThrow(new NullPointerException("Simulating an error on flush"));
+            EasyMock.replay(cache);
+            store.close();
+        } catch (final NullPointerException npe) {
+            assertFalse(underlyingStore.isOpen());
+        }
+    }
+
+    @Test
     public void shouldPutGetToFromCache() {
         store.put(bytesKey("key"), bytesValue("value"));
         store.put(bytesKey("key2"), bytesValue("value2"));
@@ -274,7 +291,8 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
         try {
             store.putAll(entries);
             fail("Should have thrown NullPointerException while putAll null key");
-        } catch (final NullPointerException e) { }
+        } catch (final NullPointerException e) {
+        }
     }
 
     @Test