You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/18 18:52:24 UTC

[3/4] kafka git commit: KAFKA-3735: Dispose all RocksObejcts upon completeness

KAFKA-3735: Dispose all RocksObejcts upon completeness

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Roger Hoover, Eno Thereska, Ismael Juma

Closes #1411 from guozhangwang/K3735-dispose-rocksobject


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bef359ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bef359ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bef359ef

Branch: refs/heads/0.10.0
Commit: bef359ef2e53920d91dfac037c0fceefb51954d1
Parents: 73949c2
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri May 20 11:52:36 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Jun 18 11:46:41 2016 -0700

----------------------------------------------------------------------
 .../wordcount/WordCountProcessorDemo.java       | 16 +++---
 .../kstream/internals/KStreamKStreamJoin.java   | 19 ++++---
 .../internals/KStreamWindowAggregate.java       | 49 ++++++++---------
 .../kstream/internals/KStreamWindowReduce.java  | 56 +++++++++-----------
 .../kafka/streams/state/KeyValueIterator.java   |  3 ++
 .../streams/state/WindowStoreIterator.java      |  8 ++-
 .../streams/state/internals/RocksDBStore.java   | 31 +++++++----
 .../internals/ProcessorTopologyTest.java        |  8 +--
 .../streams/state/KeyValueStoreTestDriver.java  |  8 +--
 .../state/internals/RocksDBWindowStoreTest.java |  7 +--
 10 files changed, 110 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 34c35b7..1ee6928 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -81,19 +81,17 @@ public class WordCountProcessorDemo {
 
                 @Override
                 public void punctuate(long timestamp) {
-                    KeyValueIterator<String, Integer> iter = this.kvStore.all();
+                    try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
+                        System.out.println("----------- " + timestamp + " ----------- ");
 
-                    System.out.println("----------- " + timestamp + " ----------- ");
+                        while (iter.hasNext()) {
+                            KeyValue<String, Integer> entry = iter.next();
 
-                    while (iter.hasNext()) {
-                        KeyValue<String, Integer> entry = iter.next();
+                            System.out.println("[" + entry.key + ", " + entry.value + "]");
 
-                        System.out.println("[" + entry.key + ", " + entry.value + "]");
-
-                        context.forward(entry.key, entry.value.toString());
+                            context.forward(entry.key, entry.value.toString());
+                        }
                     }
-
-                    iter.close();
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d13d112..72029a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -25,8 +24,8 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.Iterator;
 
 class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
 
@@ -76,15 +75,15 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
             long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
             long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
 
-            Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
-            while (iter.hasNext()) {
-                needOuterJoin = false;
-                context().forward(key, joiner.apply(value, iter.next().value));
-            }
+            try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
+                while (iter.hasNext()) {
+                    needOuterJoin = false;
+                    context().forward(key, joiner.apply(value, iter.next().value));
+                }
 
-            if (needOuterJoin)
-                context().forward(key, joiner.apply(value, null));
+                if (needOuterJoin)
+                    context().forward(key, joiner.apply(value, null));
+            }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index b4272f8..125c7fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.Iterator;
 import java.util.Map;
 
 public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
@@ -90,38 +89,37 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
                 timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
             }
 
-            WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+            try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) {
 
-            // for each matching window, try to update the corresponding key and send to the downstream
-            while (iter.hasNext()) {
-                KeyValue<Long, T> entry = iter.next();
-                W window = matchedWindows.get(entry.key);
+                // for each matching window, try to update the corresponding key and send to the downstream
+                while (iter.hasNext()) {
+                    KeyValue<Long, T> entry = iter.next();
+                    W window = matchedWindows.get(entry.key);
 
-                if (window != null) {
+                    if (window != null) {
 
-                    T oldAgg = entry.value;
+                        T oldAgg = entry.value;
 
-                    if (oldAgg == null)
-                        oldAgg = initializer.apply();
+                        if (oldAgg == null)
+                            oldAgg = initializer.apply();
 
-                    // try to add the new new value (there will never be old value)
-                    T newAgg = aggregator.apply(key, value, oldAgg);
+                        // try to add the new new value (there will never be old value)
+                        T newAgg = aggregator.apply(key, value, oldAgg);
 
-                    // update the store with the new value
-                    windowStore.put(key, newAgg, window.start());
+                        // update the store with the new value
+                        windowStore.put(key, newAgg, window.start());
 
-                    // forward the aggregated change pair
-                    if (sendOldValues)
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
-                    else
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+                        // forward the aggregated change pair
+                        if (sendOldValues)
+                            context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                        else
+                            context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
 
-                    matchedWindows.remove(entry.key);
+                        matchedWindows.remove(entry.key);
+                    }
                 }
             }
 
-            iter.close();
-
             // create the new window for the rest of unmatched window that do not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
                 T oldAgg = initializer.apply();
@@ -167,10 +165,9 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
             W window = (W) windowedKey.window();
 
             // this iterator should contain at most one element
-            Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
-
-            return iter.hasNext() ? iter.next().value : null;
+            try (WindowStoreIterator<T> iter = windowStore.fetch(key, window.start(), window.start())) {
+                return iter.hasNext() ? iter.next().value : null;
+            }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 3ed1499..a526506 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.Iterator;
 import java.util.Map;
 
 public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
@@ -88,40 +87,38 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
                 timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
             }
 
-            WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+            try (WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo)) {
+                // for each matching window, try to update the corresponding key and send to the downstream
+                while (iter.hasNext()) {
+                    KeyValue<Long, V> entry = iter.next();
+                    W window = matchedWindows.get(entry.key);
 
-            // for each matching window, try to update the corresponding key and send to the downstream
-            while (iter.hasNext()) {
-                KeyValue<Long, V> entry = iter.next();
-                W window = matchedWindows.get(entry.key);
+                    if (window != null) {
 
-                if (window != null) {
+                        V oldAgg = entry.value;
+                        V newAgg = oldAgg;
 
-                    V oldAgg = entry.value;
-                    V newAgg = oldAgg;
+                        // try to add the new new value (there will never be old value)
+                        if (newAgg == null) {
+                            newAgg = value;
+                        } else {
+                            newAgg = reducer.apply(newAgg, value);
+                        }
 
-                    // try to add the new new value (there will never be old value)
-                    if (newAgg == null) {
-                        newAgg = value;
-                    } else {
-                        newAgg = reducer.apply(newAgg, value);
-                    }
-
-                    // update the store with the new value
-                    windowStore.put(key, newAgg, window.start());
+                        // update the store with the new value
+                        windowStore.put(key, newAgg, window.start());
 
-                    // forward the aggregated change pair
-                    if (sendOldValues)
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
-                    else
-                        context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+                        // forward the aggregated change pair
+                        if (sendOldValues)
+                            context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+                        else
+                            context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
 
-                    matchedWindows.remove(entry.key);
+                        matchedWindows.remove(entry.key);
+                    }
                 }
             }
 
-            iter.close();
-
             // create the new window for the rest of unmatched window that do not exist yet
             for (long windowStartMs : matchedWindows.keySet()) {
                 windowStore.put(key, value, windowStartMs);
@@ -161,10 +158,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
             W window = (W) windowedKey.window();
 
             // this iterator should only contain one element
-            Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
-
-            return iter.next().value;
+            try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(), window.start())) {
+                return iter.next().value;
+            }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index cdb3de5..ddbc7b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -27,6 +27,9 @@ import java.util.Iterator;
 /**
  * Iterator interface of {@link KeyValue}.
  *
+ * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ *
  * @param <K> Type of keys
  * @param <V> Type of values
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 7c474dd..b6e6d0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -21,13 +21,19 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.streams.KeyValue;
 
+import java.io.Closeable;
 import java.util.Iterator;
 
 /**
  * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}.
  *
+ * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ *
  * @param <E> Type of values
  */
-public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> {
+public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>>, Closeable {
+
+    @Override
     void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 37609a0..a00de19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -77,17 +77,18 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private final String name;
     private final String parentDir;
 
-    private final Options options;
-    private final WriteOptions wOptions;
-    private final FlushOptions fOptions;
-
+    protected File dbDir;
+    private StateSerdes<K, V> serdes;
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
 
-    private StateSerdes<K, V> serdes;
-    protected File dbDir;
     private RocksDB db;
 
+    // the following option objects will be created at constructor and disposed at close()
+    private Options options;
+    private WriteOptions wOptions;
+    private FlushOptions fOptions;
+
     private boolean loggingEnabled = false;
     private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
 
@@ -313,14 +314,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
         WriteBatch batch = new WriteBatch();
 
-        for (KeyValue<byte[], byte[]> entry : entries) {
-            batch.put(entry.key, entry.value);
-        }
-
         try {
+            for (KeyValue<byte[], byte[]> entry : entries) {
+                batch.put(entry.key, entry.value);
+            }
+
             db.write(wOptions, batch);
         } catch (RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
+        } finally {
+            batch.dispose();
         }
     }
 
@@ -425,7 +428,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     @Override
     public void close() {
         flush();
+        options.dispose();
+        wOptions.dispose();
+        fOptions.dispose();
         db.close();
+
+        options = null;
+        wOptions = null;
+        fOptions = null;
+        db = null;
     }
 
     private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1095fcf..62b283a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -351,9 +351,11 @@ public class ProcessorTopologyTest {
         @Override
         public void punctuate(long streamTime) {
             int count = 0;
-            for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) {
-                iter.next();
-                ++count;
+            try (KeyValueIterator<String, String> iter = store.all()) {
+                while (iter.hasNext()) {
+                    iter.next();
+                    ++count;
+                }
             }
             context().forward(Long.toString(streamTime), count);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 3a35d75..be5596d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -362,9 +362,11 @@ public class KeyValueStoreTestDriver<K, V> {
      */
     public int sizeOf(KeyValueStore<K, V> store) {
         int size = 0;
-        for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
-            iterator.next();
-            ++size;
+        try (KeyValueIterator<K, V> iterator = store.all()) {
+            while (iterator.hasNext()) {
+                iterator.next();
+                ++size;
+            }
         }
         return size;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index e9888ad..d889e7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -785,9 +785,10 @@ public class RocksDBWindowStoreTest {
                         segmentDirs(baseDir)
                 );
 
-                WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
-                while (iter.hasNext()) {
-                    iter.next();
+                try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) {
+                    while (iter.hasNext()) {
+                        iter.next();
+                    }
                 }
 
                 assertEquals(