You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 23:18:05 UTC
kafka git commit: KAFKA-3805: Check if DB is null.
Repository: kafka
Updated Branches:
refs/heads/trunk 47f5ae9e9 -> 751fe9309
KAFKA-3805: Check if DB is null.
- Check if DB is null before flushing or closing. In some cases, a state store is closed twice. This happens in `StreamTask.close()` where both `node.close()` and `super.close` (in `ProcessorManager`) are called in a sequence. If the user's processor defines a `close` that closes the underlying state store, then the second close will be redundant.
Author: Eno Thereska <en...@gmail.com>
Reviewers: Andr�s G�mez, Ismael Juma, Guozhang Wang
Closes #1485 from enothereska/KAFKA-3805-locks
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/751fe930
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/751fe930
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/751fe930
Branch: refs/heads/trunk
Commit: 751fe9309011b99f60c1cb03c23a47d0444dce05
Parents: 47f5ae9
Author: Eno Thereska <en...@gmail.com>
Authored: Thu Jun 16 16:18:02 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 16 16:18:02 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/streams/processor/StateStore.java | 4 +++-
.../streams/processor/internals/ProcessorStateManager.java | 4 ++++
.../apache/kafka/streams/state/internals/RocksDBStore.java | 9 +++++++++
3 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/751fe930/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index f79e6f6..68f3644 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -46,7 +46,9 @@ public interface StateStore {
void flush();
/**
- * Close the storage engine
+ * Close the storage engine.
+ * Note that this function needs to be idempotent since it may be called
+ * several times on the same state store.
*/
void close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/751fe930/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1d97384..92b1069 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -134,6 +134,8 @@ public class ProcessorStateManager {
retry--;
lock = lockStateDirectory(channel);
}
+ // TODO: closing the channel here risks releasing all locks on the file
+ // see {@link https://issues.apache.org/jira/browse/KAFKA-3812}
if (lock == null) {
channel.close();
}
@@ -336,6 +338,8 @@ public class ProcessorStateManager {
*/
public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
try {
+ // attempting to flush and close the stores, just in case they
+ // are not closed by a ProcessorNode yet
if (!stores.isEmpty()) {
log.debug("Closing stores.");
for (Map.Entry<String, StateStore> entry : stores.entrySet()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/751fe930/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8634d68..d9670b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -444,6 +444,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void flush() {
+ if (db == null) {
+ return;
+ }
+
// flush of the cache entries if necessary
flushCache();
@@ -464,6 +468,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void close() {
+
+ if (db == null) {
+ return;
+ }
+
flush();
options.dispose();
wOptions.dispose();