You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/15 20:55:28 UTC

kafka git commit: MINOR: make flush no-op as we don't need to call flush on commit.

Repository: kafka
Updated Branches:
  refs/heads/trunk 1d556e471 -> 90b2a2bf6


MINOR: make flush no-op as we don't need to call flush on commit.

In the event of a crash, we always restore the data from the backing changelog.  So it seems that we don't need to
 persist all data to disk by calling flush when committing.  Frequent flushing leads to a large number of small files for compaction increasing
 compaction pressure.   This PR will perform benchmarks to see if there is any performance gain in not calling `flush()` each time we commit.

Author: Bill Bejeck <bi...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Eno Thereska <en...@gmail.com>, Guozhang Wang <wa...@gmail.com>

Closes #3211 from bbejeck/MINOR_no_flush_on_commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/90b2a2bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/90b2a2bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/90b2a2bf

Branch: refs/heads/trunk
Commit: 90b2a2bf664e4e40d4cd1b46c72732c5edb97cf9
Parents: 1d556e4
Author: Bill Bejeck <bi...@confluent.io>
Authored: Thu Jun 15 13:55:25 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 15 13:55:25 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/internals/RocksDBStore.java      | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/90b2a2bf/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a3ecb64..cf41c92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -95,6 +95,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private Options options;
     private WriteOptions wOptions;
     private FlushOptions fOptions;
+    private boolean eosEnabled;
 
     protected volatile boolean open = false;
 
@@ -178,7 +179,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 putInternal(key, value);
             }
         });
-
+        eosEnabled = checkForEos(context.appConfigs());
         open = true;
     }
 
@@ -355,12 +356,12 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public synchronized void flush() {
-        if (db == null) {
+        if (db == null || eosEnabled) {
             return;
         }
-        // flush RocksDB
         flushInternal();
     }
+
     /**
      * @throws ProcessorStateException if flushing failed because of any internal store exceptions
      */
@@ -372,6 +373,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    private boolean checkForEos(Map<String, Object> configs) {
+        return StreamsConfig.EXACTLY_ONCE.equals(configs.get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
+    }
+
     @Override
     public synchronized void close() {
         if (!open) {