You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/18 18:52:24 UTC
[3/4] kafka git commit: KAFKA-3735: Dispose all RocksObejcts upon
completeness
KAFKA-3735: Dispose all RocksObejcts upon completeness
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Roger Hoover, Eno Thereska, Ismael Juma
Closes #1411 from guozhangwang/K3735-dispose-rocksobject
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bef359ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bef359ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bef359ef
Branch: refs/heads/0.10.0
Commit: bef359ef2e53920d91dfac037c0fceefb51954d1
Parents: 73949c2
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri May 20 11:52:36 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Jun 18 11:46:41 2016 -0700
----------------------------------------------------------------------
.../wordcount/WordCountProcessorDemo.java | 16 +++---
.../kstream/internals/KStreamKStreamJoin.java | 19 ++++---
.../internals/KStreamWindowAggregate.java | 49 ++++++++---------
.../kstream/internals/KStreamWindowReduce.java | 56 +++++++++-----------
.../kafka/streams/state/KeyValueIterator.java | 3 ++
.../streams/state/WindowStoreIterator.java | 8 ++-
.../streams/state/internals/RocksDBStore.java | 31 +++++++----
.../internals/ProcessorTopologyTest.java | 8 +--
.../streams/state/KeyValueStoreTestDriver.java | 8 +--
.../state/internals/RocksDBWindowStoreTest.java | 7 +--
10 files changed, 110 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 34c35b7..1ee6928 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -81,19 +81,17 @@ public class WordCountProcessorDemo {
@Override
public void punctuate(long timestamp) {
- KeyValueIterator<String, Integer> iter = this.kvStore.all();
+ try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
+ System.out.println("----------- " + timestamp + " ----------- ");
- System.out.println("----------- " + timestamp + " ----------- ");
+ while (iter.hasNext()) {
+ KeyValue<String, Integer> entry = iter.next();
- while (iter.hasNext()) {
- KeyValue<String, Integer> entry = iter.next();
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
- System.out.println("[" + entry.key + ", " + entry.value + "]");
-
- context.forward(entry.key, entry.value.toString());
+ context.forward(entry.key, entry.value.toString());
+ }
}
-
- iter.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index d13d112..72029a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -25,8 +24,8 @@ import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.Iterator;
class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
@@ -76,15 +75,15 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
long timeTo = Math.max(0L, context().timestamp() + joinAfterMs);
- Iterator<KeyValue<Long, V2>> iter = otherWindow.fetch(key, timeFrom, timeTo);
- while (iter.hasNext()) {
- needOuterJoin = false;
- context().forward(key, joiner.apply(value, iter.next().value));
- }
+ try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) {
+ while (iter.hasNext()) {
+ needOuterJoin = false;
+ context().forward(key, joiner.apply(value, iter.next().value));
+ }
- if (needOuterJoin)
- context().forward(key, joiner.apply(value, null));
+ if (needOuterJoin)
+ context().forward(key, joiner.apply(value, null));
+ }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index b4272f8..125c7fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.Iterator;
import java.util.Map;
public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
@@ -90,38 +89,37 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
}
- WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo);
+ try (WindowStoreIterator<T> iter = windowStore.fetch(key, timeFrom, timeTo)) {
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue<Long, T> entry = iter.next();
- W window = matchedWindows.get(entry.key);
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, T> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
- if (window != null) {
+ if (window != null) {
- T oldAgg = entry.value;
+ T oldAgg = entry.value;
- if (oldAgg == null)
- oldAgg = initializer.apply();
+ if (oldAgg == null)
+ oldAgg = initializer.apply();
- // try to add the new new value (there will never be old value)
- T newAgg = aggregator.apply(key, value, oldAgg);
+ // try to add the new new value (there will never be old value)
+ T newAgg = aggregator.apply(key, value, oldAgg);
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
- // forward the aggregated change pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
- matchedWindows.remove(entry.key);
+ matchedWindows.remove(entry.key);
+ }
}
}
- iter.close();
-
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
T oldAgg = initializer.apply();
@@ -167,10 +165,9 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
W window = (W) windowedKey.window();
// this iterator should contain at most one element
- Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start());
-
- return iter.hasNext() ? iter.next().value : null;
+ try (WindowStoreIterator<T> iter = windowStore.fetch(key, window.start(), window.start())) {
+ return iter.hasNext() ? iter.next().value : null;
+ }
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 3ed1499..a526506 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import java.util.Iterator;
import java.util.Map;
public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
@@ -88,40 +87,38 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
}
- WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+ try (WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo)) {
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, V> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
- // for each matching window, try to update the corresponding key and send to the downstream
- while (iter.hasNext()) {
- KeyValue<Long, V> entry = iter.next();
- W window = matchedWindows.get(entry.key);
+ if (window != null) {
- if (window != null) {
+ V oldAgg = entry.value;
+ V newAgg = oldAgg;
- V oldAgg = entry.value;
- V newAgg = oldAgg;
+ // try to add the new new value (there will never be old value)
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
+ }
- // try to add the new new value (there will never be old value)
- if (newAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(newAgg, value);
- }
-
- // update the store with the new value
- windowStore.put(key, newAgg, window.start());
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
- // forward the aggregated change pair
- if (sendOldValues)
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
- else
- context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
- matchedWindows.remove(entry.key);
+ matchedWindows.remove(entry.key);
+ }
}
}
- iter.close();
-
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
windowStore.put(key, value, windowStartMs);
@@ -161,10 +158,9 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
W window = (W) windowedKey.window();
// this iterator should only contain one element
- Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
-
- return iter.next().value;
+ try (WindowStoreIterator<V> iter = windowStore.fetch(key, window.start(), window.start())) {
+ return iter.next().value;
+ }
}
-
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
index cdb3de5..ddbc7b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
@@ -27,6 +27,9 @@ import java.util.Iterator;
/**
* Iterator interface of {@link KeyValue}.
*
+ * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ *
* @param <K> Type of keys
* @param <V> Type of values
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index 7c474dd..b6e6d0c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -21,13 +21,19 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.streams.KeyValue;
+import java.io.Closeable;
import java.util.Iterator;
/**
* Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}.
*
+ * Users need to call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ *
* @param <E> Type of values
*/
-public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> {
+public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>>, Closeable {
+
+ @Override
void close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 37609a0..a00de19 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -77,17 +77,18 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final String name;
private final String parentDir;
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
+ protected File dbDir;
+ private StateSerdes<K, V> serdes;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
- private StateSerdes<K, V> serdes;
- protected File dbDir;
private RocksDB db;
+ // the following option objects will be created at constructor and disposed at close()
+ private Options options;
+ private WriteOptions wOptions;
+ private FlushOptions fOptions;
+
private boolean loggingEnabled = false;
private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
@@ -313,14 +314,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
WriteBatch batch = new WriteBatch();
- for (KeyValue<byte[], byte[]> entry : entries) {
- batch.put(entry.key, entry.value);
- }
-
try {
+ for (KeyValue<byte[], byte[]> entry : entries) {
+ batch.put(entry.key, entry.value);
+ }
+
db.write(wOptions, batch);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
+ } finally {
+ batch.dispose();
}
}
@@ -425,7 +428,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void close() {
flush();
+ options.dispose();
+ wOptions.dispose();
+ fOptions.dispose();
db.close();
+
+ options = null;
+ wOptions = null;
+ fOptions = null;
+ db = null;
}
private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1095fcf..62b283a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -351,9 +351,11 @@ public class ProcessorTopologyTest {
@Override
public void punctuate(long streamTime) {
int count = 0;
- for (KeyValueIterator<String, String> iter = store.all(); iter.hasNext();) {
- iter.next();
- ++count;
+ try (KeyValueIterator<String, String> iter = store.all()) {
+ while (iter.hasNext()) {
+ iter.next();
+ ++count;
+ }
}
context().forward(Long.toString(streamTime), count);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 3a35d75..be5596d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -362,9 +362,11 @@ public class KeyValueStoreTestDriver<K, V> {
*/
public int sizeOf(KeyValueStore<K, V> store) {
int size = 0;
- for (KeyValueIterator<K, V> iterator = store.all(); iterator.hasNext();) {
- iterator.next();
- ++size;
+ try (KeyValueIterator<K, V> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ iterator.next();
+ ++size;
+ }
}
return size;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bef359ef/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index e9888ad..d889e7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -785,9 +785,10 @@ public class RocksDBWindowStoreTest {
segmentDirs(baseDir)
);
- WindowStoreIterator iter = store.fetch(0, 0L, 1000000L);
- while (iter.hasNext()) {
- iter.next();
+ try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) {
+ while (iter.hasNext()) {
+ iter.next();
+ }
}
assertEquals(