You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/15 20:55:28 UTC
kafka git commit: MINOR: make flush no-op as we don't need to call
flush on commit.
Repository: kafka
Updated Branches:
refs/heads/trunk 1d556e471 -> 90b2a2bf6
MINOR: make flush no-op as we don't need to call flush on commit.
In the event of a crash, we always restore the data from the backing changelog. So it seems that we don't need to
persist all data to disk by calling flush when committing. Frequent flushing leads to a large number of small files for compaction increasing
compaction pressure. This PR will perform benchmarks to see if there is any performance gain in not calling `flush()` each time we commit.
Author: Bill Bejeck <bi...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Eno Thereska <en...@gmail.com>, Guozhang Wang <wa...@gmail.com>
Closes #3211 from bbejeck/MINOR_no_flush_on_commit
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/90b2a2bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/90b2a2bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/90b2a2bf
Branch: refs/heads/trunk
Commit: 90b2a2bf664e4e40d4cd1b46c72732c5edb97cf9
Parents: 1d556e4
Author: Bill Bejeck <bi...@confluent.io>
Authored: Thu Jun 15 13:55:25 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 15 13:55:25 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/state/internals/RocksDBStore.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/90b2a2bf/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a3ecb64..cf41c92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -95,6 +95,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private Options options;
private WriteOptions wOptions;
private FlushOptions fOptions;
+ private boolean eosEnabled;
protected volatile boolean open = false;
@@ -178,7 +179,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
putInternal(key, value);
}
});
-
+ eosEnabled = checkForEos(context.appConfigs());
open = true;
}
@@ -355,12 +356,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public synchronized void flush() {
- if (db == null) {
+ if (db == null || eosEnabled) {
return;
}
- // flush RocksDB
flushInternal();
}
+
/**
* @throws ProcessorStateException if flushing failed because of any internal store exceptions
*/
@@ -372,6 +373,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
}
+ private boolean checkForEos(Map<String, Object> configs) {
+ return StreamsConfig.EXACTLY_ONCE.equals(configs.get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
+ }
+
@Override
public synchronized void close() {
if (!open) {