You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/17 21:05:00 UTC

[jira] [Commented] (KAFKA-7103) Use bulkloading for RocksDBSegmentedBytesStore during init

    [ https://issues.apache.org/jira/browse/KAFKA-7103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547108#comment-16547108 ] 

ASF GitHub Bot commented on KAFKA-7103:
---------------------------------------

guozhangwang closed pull request #5276: KAFKA-7103: Use bulkloading for RocksDBSegmentedBytesStore during init
URL: https://github.com/apache/kafka/pull/5276
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index c5d15d6e5dc..5f1ec3781d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -16,26 +16,36 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
-    private final static Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
-
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
     private final String name;
     private final Segments segments;
     private final KeySchema keySchema;
     private InternalProcessorContext context;
     private volatile boolean open;
+    private Set<Segment> bulkLoadSegments;
 
     RocksDBSegmentedBytesStore(final String name,
                                final long retention,
@@ -131,15 +141,11 @@ public void init(ProcessorContext context, StateStore root) {
 
         segments.openExisting(this.context);
 
+        bulkLoadSegments = new HashSet<>(segments.allSegments());
+
         // register and possibly restore the state from the logs
-        context.register(root, new StateRestoreCallback() {
-            @Override
-            public void restore(byte[] key, byte[] value) {
-                put(Bytes.wrap(key), value);
-            }
-        });
+        context.register(root, new RocksDBSegmentsBatchingRestoreCallback());
 
-        flush();
         open = true;
     }
 
@@ -164,4 +170,84 @@ public boolean isOpen() {
         return open;
     }
 
+    // Visible for testing
+    List<Segment> getSegments() {
+        return segments.allSegments();
+    }
+
+    // Visible for testing
+    void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
+        try {
+            final Map<Segment, WriteBatch> writeBatchMap = getWriteBatches(records);
+            for (final Map.Entry<Segment, WriteBatch> entry : writeBatchMap.entrySet()) {
+                final Segment segment = entry.getKey();
+                final WriteBatch batch = entry.getValue();
+                segment.write(batch);
+            }
+        } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
+        }
+    }
+
+    // Visible for testing
+    Map<Segment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> records) {
+        final Map<Segment, WriteBatch> writeBatchMap = new HashMap<>();
+        for (final KeyValue<byte[], byte[]> record : records) {
+            final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key)));
+            final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+            if (segment != null) {
+                // This handles the case that state store is moved to a new client and does not
+                // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
+                // will only close the database and open it again with bulk loading enabled.
+                if (!bulkLoadSegments.contains(segment)) {
+                    segment.toggleDbForBulkLoading(true);
+                    // If the store does not exist yet, the getOrCreateSegmentIfLive will call openDB that
+                    // makes the open flag for the newly created store.
+                    // if the store does exist already, then toggleDbForBulkLoading will make sure that
+                    // the store is already open here.
+                    bulkLoadSegments = new HashSet<>(segments.allSegments());
+                }
+                try {
+                    final WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
+                    if (record.value == null) {
+                        batch.remove(record.key);
+                    } else {
+                        batch.put(record.key, record.value);
+                    }
+                } catch (final RocksDBException e) {
+                    throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
+                }
+            }
+        }
+        return writeBatchMap;
+    }
+
+    private void toggleForBulkLoading(final boolean prepareForBulkload) {
+        for (final Segment segment: segments.allSegments()) {
+            segment.toggleDbForBulkLoading(prepareForBulkload);
+        }
+    }
+
+    private class RocksDBSegmentsBatchingRestoreCallback extends AbstractNotifyingBatchingRestoreCallback {
+
+        @Override
+        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) {
+            restoreAllInternal(records);
+        }
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            toggleForBulkLoading(true);
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long totalRestored) {
+            toggleForBulkLoading(false);
+        }
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 17d03cc1da3..cb007474af3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -90,7 +90,7 @@
     private FlushOptions fOptions;
 
     private volatile boolean prepareForBulkload = false;
-    private ProcessorContext internalProcessorContext;
+    ProcessorContext internalProcessorContext;
     // visible for testing
     volatile BatchingStateRestoreCallback batchingStateRestoreCallback = null;
 
@@ -230,7 +230,7 @@ private void validateStoreOpen() {
         }
     }
 
-    private void toggleDbForBulkLoading(final boolean prepareForBulkload) {
+    void toggleDbForBulkLoading(final boolean prepareForBulkload) {
 
         if (prepareForBulkload) {
             // if the store is not empty, we need to compact to get around the num.levels check
@@ -276,7 +276,7 @@ public synchronized void put(final Bytes key,
         return originalValue;
     }
 
-    private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
+    void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> records) {
         try (final WriteBatch batch = new WriteBatch()) {
             for (final KeyValue<byte[], byte[]> record : records) {
                 if (record.value == null) {
@@ -285,7 +285,7 @@ private void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> recor
                     batch.put(record.key, record.value);
                 }
             }
-            db.write(wOptions, batch);
+            write(batch);
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
         }
@@ -310,6 +310,10 @@ private void putInternal(final byte[] rawKey,
         }
     }
 
+    void write(final WriteBatch batch) throws RocksDBException {
+        db.write(wOptions, batch);
+    }
+
     @Override
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         try (final WriteBatch batch = new WriteBatch()) {
@@ -321,7 +325,7 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
                     batch.put(entry.key.get(), entry.value);
                 }
             }
-            db.write(wOptions, batch);
+            write(batch);
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index dbe470e7c15..d107812295e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -39,10 +39,12 @@ public int compareTo(Segment segment) {
         return Long.compare(id, segment.id);
     }
 
+
     @Override
     public void openDB(final ProcessorContext context) {
         super.openDB(context);
         // skip the registering step
+        internalProcessorContext = context;
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 6b9e7a808ae..8e69ccbfee8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -32,12 +33,14 @@
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 import org.junit.runners.Parameterized.Parameter;
+import org.rocksdb.WriteBatch;
 
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 
@@ -46,10 +49,12 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SimpleTimeZone;
 
@@ -291,6 +296,67 @@ public void shouldBeAbleToWriteToReInitializedStore() {
         bytesStore.put(serializeKey(new Windowed<>(key, windows[1])), serializeValue(100));
     }
 
+    @Test
+    public void shouldCreateWriteBatches() {
+        final String key = "a";
+        final Collection<KeyValue<byte[], byte[]>> records = new ArrayList<>();
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[0])).get(), serializeValue(50L)));
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(), serializeValue(100L)));
+        final Map<Segment, WriteBatch> writeBatchMap = bytesStore.getWriteBatches(records);
+        assertEquals(2, writeBatchMap.size());
+        for (final WriteBatch batch: writeBatchMap.values()) {
+            assertEquals(1, batch.count());
+        }
+    }
+
+    @Test
+    public void shouldRestoreToByteStore() {
+        // 0 segments initially.
+        assertEquals(0, bytesStore.getSegments().size());
+        final String key = "a";
+        final Collection<KeyValue<byte[], byte[]>> records = new ArrayList<>();
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[0])).get(), serializeValue(50L)));
+        records.add(new KeyValue<>(serializeKey(new Windowed<>(key, windows[3])).get(), serializeValue(100L)));
+        bytesStore.restoreAllInternal(records);
+
+        // 2 segments are created during restoration.
+        assertEquals(2, bytesStore.getSegments().size());
+
+        // Bulk loading is enabled during recovery.
+        for (final Segment segment: bytesStore.getSegments()) {
+            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+        }
+
+        final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
+        expected.add(new KeyValue<>(new Windowed<>(key, windows[0]), 50L));
+        expected.add(new KeyValue<>(new Windowed<>(key, windows[3]), 100L));
+
+        final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
+        assertEquals(expected, results);
+    }
+
+    @Test
+    public void shouldRespectBulkLoadOptionsDuringInit() {
+        bytesStore.init(context, bytesStore);
+        final String key = "a";
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
+        bytesStore.put(serializeKey(new Windowed<>(key, windows[3])), serializeValue(100L));
+        assertEquals(2, bytesStore.getSegments().size());
+
+        final StateRestoreListener restoreListener = context.getRestoreListener(bytesStore.name());
+
+        restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
+
+        for (final Segment segment: bytesStore.getSegments()) {
+            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+        }
+
+        restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
+        for (final Segment segment: bytesStore.getSegments()) {
+            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4));
+        }
+    }
+
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Use bulkloading for RocksDBSegmentedBytesStore during init
> ----------------------------------------------------------
>
>                 Key: KAFKA-7103
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7103
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Liquan Pei
>            Assignee: Liquan Pei
>            Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We should use bulk loading for recovering RocksDBWindowStore, same as RocksDBStore. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)