You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/13 22:41:53 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8661: KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks

ableegoldman commented on a change in pull request #8661:
URL: https://github.com/apache/kafka/pull/8661#discussion_r424771783



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
             final long segmentId = segments.segmentId(timestamp);
             final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
             if (segment != null) {
-                // This handles the case that state store is moved to a new client and does not
-                // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
-                // will only close the database and open it again with bulk loading enabled.
-                if (!bulkLoadSegments.contains(segment)) {
-                    segment.toggleDbForBulkLoading(true);
-                    // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that
-                    // makes the open flag for the newly created store.
-                    // if the store does exist already, then toggleDbForBulkLoading will make sure that
-                    // the store is already open here.
-                    bulkLoadSegments = new HashSet<>(segments.allSegments());
-                }

Review comment:
       Here are the current bulk loading configs: ```
   dbOptions.setMaxBackgroundFlushes(4);
   columnFamilyOptions.setDisableAutoCompactions(true);
   columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
   columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
   columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);
   ```
   Setting aside the problems these are causing users [even for active tasks](https://issues.apache.org/jira/browse/KAFKA-9062), they basically mean "shove everything into the lowest file level and never attempt to limit these writes". This is useful if you're just trying to shove a lot of data into a store as fast as possible but not necessary need to use it immediately after, which is (debatably) the right thing for restoring tasks but definitely not appropriate for standbys*. We will attempt to restore a batch of records once per main thread loop, which means doing a lot of other stuff in between. There's no reason not to just use normal mode writing for standbys AFAICT -- also bulk loading will make IQ on standbys pretty annoying at best.
    
   *In the larger scope, perhaps when we move standbys to a separate thread, I'd say we actually should be turning on bulk loading. BUT we need to issue a manual compaction every so often, and ideally not flush them during every commit (related to [KAFKA-9450](https://issues.apache.org/jira/browse/KAFKA-9450) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -248,17 +243,6 @@ void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
             final long segmentId = segments.segmentId(timestamp);
             final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
             if (segment != null) {
-                // This handles the case that state store is moved to a new client and does not
-                // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
-                // will only close the database and open it again with bulk loading enabled.
-                if (!bulkLoadSegments.contains(segment)) {
-                    segment.toggleDbForBulkLoading(true);
-                    // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that
-                    // makes the open flag for the newly created store.
-                    // if the store does exist already, then toggleDbForBulkLoading will make sure that
-                    // the store is already open here.
-                    bulkLoadSegments = new HashSet<>(segments.allSegments());
-                }

Review comment:
       Here are the current bulk loading configs: 
   ```
   dbOptions.setMaxBackgroundFlushes(4);
   columnFamilyOptions.setDisableAutoCompactions(true);
   columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
   columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
   columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);
   ```
   Setting aside the problems these are causing users [even for active tasks](https://issues.apache.org/jira/browse/KAFKA-9062), they basically mean "shove everything into the lowest file level and never attempt to limit these writes". This is useful if you're just trying to shove a lot of data into a store as fast as possible but not necessary need to use it immediately after, which is (debatably) the right thing for restoring tasks but definitely not appropriate for standbys*. We will attempt to restore a batch of records once per main thread loop, which means doing a lot of other stuff in between. There's no reason not to just use normal mode writing for standbys AFAICT -- also bulk loading will make IQ on standbys pretty annoying at best.
    
   *In the larger scope, perhaps when we move standbys to a separate thread, I'd say we actually should be turning on bulk loading. BUT we need to issue a manual compaction every so often, and ideally not flush them during every commit (related to [KAFKA-9450](https://issues.apache.org/jira/browse/KAFKA-9450) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org