You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/15 19:38:49 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #8676: KAFKA-10005 [DO NO MERGE]: Decouple RestoreListener from RestoreCallback

guozhangwang opened a new pull request #8676:
URL: https://github.com/apache/kafka/pull/8676


   .. and remove bulk loading mechanism inside RocksDB.
   
   We need to validate in benchmarks that removing bulk loading would not incur large perf regression; if yes, we should consider adding other optimizations like separate thread polls and parallel writes before merging this PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #8676:
URL: https://github.com/apache/kafka/pull/8676


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-639898184


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005 [DO NO MERGE]: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-629444892






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r436212253



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
##########
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       _awesome_

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -38,7 +38,6 @@
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

Review comment:
       weird that the import became unused, even though there were no other code changes...

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -210,30 +207,6 @@ public void shouldFindSingleStoreForChangelog() {
         );
     }
 
-    @Test
-    public void shouldRestoreStoreWithRestoreCallback() {

Review comment:
       It doesn't look like this was related to the bulk loading, but I'm guessing it was.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -237,25 +236,6 @@ public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserP
         verify(metricsRecorder);
     }
 
-    @Test
-    public void shouldRespectBulkloadOptionsDuringInit() {
-        rocksDBStore.init(context, rocksDBStore);
-
-        final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
-
-        restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
-
-        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));

Review comment:
       Were the bulk loading options a public interface? It doesn't seem like ignoring them would be a "breakage", so I don't think it hinders the removal, but it does seem worth documenting, and maybe trying to detect and warn?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r436240662



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -38,7 +38,6 @@
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

Review comment:
       They are used but only in javadocs; when I rebase from trunk `checkstyle` failed complaining about them, but when I rebased again it stopped complaining, so I guess there's some rules changed here. I will revert them back.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -210,30 +207,6 @@ public void shouldFindSingleStoreForChangelog() {
         );
     }
 
-    @Test
-    public void shouldRestoreStoreWithRestoreCallback() {

Review comment:
       Ah good catch, I should not remove this test; instead I'd just replace the removed MockBatchingStateRestoreListener with MockStateStoreCallback.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -237,25 +236,6 @@ public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserP
         verify(metricsRecorder);
     }
 
-    @Test
-    public void shouldRespectBulkloadOptionsDuringInit() {
-        rocksDBStore.init(context, rocksDBStore);
-
-        final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
-
-        restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
-
-        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));

Review comment:
       Bulk loading was not a public interface, however we did have documents before about compounding listener and callback. Thinking about this, I will add a warn when detected that the registered callback is also a listener.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -303,6 +300,11 @@ public void registerStore(final StateStore store, final StateRestoreCallback sta
             throw new IllegalArgumentException(format("%sStore %s has already been registered.", logPrefix, storeName));
         }
 
+        if (stateRestoreCallback instanceof StateRestoreListener) {

Review comment:
       This is the warn log I added, lmk wdyt @vvcephei 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
##########
@@ -29,9 +29,12 @@
  * Users desiring stateful operations will need to provide synchronization internally in
  * the {@code StateRestorerListener} implementation.
  *
- * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or
- * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary
- * as each StreamThread has its own StateStore instance.
+ * Note that this listener is only registered at the per-client level and users can base on the {@code storeName}

Review comment:
       Updated the javadoc to emphasize one should not compound the two interfaces, cc @vvcephei .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-640892231


   Merged to trunk and cherry-picked to 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r436858916



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java
##########
@@ -34,7 +34,6 @@
     /**
      * Unregisters and removes the passed in partitions from the set of changelogs
      * @param removedPartitions the set of partitions to remove
-     * @param triggerOnRestoreEnd whether to trigger the onRestoreEnd callback
      */
-    void unregister(final Collection<TopicPartition> removedPartitions, final boolean triggerOnRestoreEnd);
+    void unregister(final Collection<TopicPartition> removedPartitions);

Review comment:
       Nice, this was kind of an ugly hack to being with so I'm happy to see it go




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005 [DO NO MERGE]: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-629490272


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8676: KAFKA-10005 [DO NO MERGE]: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-629444775


   retest this pleas


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang removed a comment on pull request #8676: KAFKA-10005 [DO NO MERGE]: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang removed a comment on pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#issuecomment-629444775


   retest this pleas


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8676: KAFKA-10005 [DO NO MERGE]: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r426011578



##########
File path: streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
##########
@@ -18,19 +18,15 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.AbstractNotifyingRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-public class MockStateRestoreListener extends AbstractNotifyingRestoreCallback {
+public class MockStateRestoreListener implements StateRestoreListener {
 
     // verifies store name called for each state
     public final Map<String, String> storeNameCalledStates = new HashMap<>();
-    public final List<KeyValue<byte[], byte[]>> restored = new ArrayList<>();

Review comment:
       This field is not used anywhere in tests.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
##########
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
       This class is actually not used anywhere except unit tests: it is only a by-product of the coupling.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockBatchingStateRestoreListener.java
##########
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-public class MockBatchingStateRestoreListener extends MockStateRestoreListener implements BatchingStateRestoreCallback {

Review comment:
       This mock can now be replaced by other existing ones.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/BatchingStateRestoreCallback.java
##########
@@ -38,4 +38,8 @@
      */
     void restoreAll(Collection<KeyValue<byte[], byte[]>> records);
 
+    @Override
+    default void restore(byte[] key, byte[] value) {

Review comment:
       Minor improvement such that any impls of the batching interface do not have to restore the per-entry `restore` func.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8676: KAFKA-10005: Decouple RestoreListener from RestoreCallback

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8676:
URL: https://github.com/apache/kafka/pull/8676#discussion_r436755332



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -237,25 +236,6 @@ public void shouldNotRemoveStatisticsFromInjectedMetricsRecorderOnCloseWhenUserP
         verify(metricsRecorder);
     }
 
-    @Test
-    public void shouldRespectBulkloadOptionsDuringInit() {
-        rocksDBStore.init(context, rocksDBStore);
-
-        final StateRestoreListener restoreListener = context.getRestoreListener(rocksDBStore.name());
-
-        restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
-
-        assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));

Review comment:
       Sounds good. I was thrown off by `userSpecifiedOptions.prepareForBulkLoad()` because it looks like the user could have specified some bulk loading options, but I took another look, and I see that that's just the variable name. The actual method is internal, and there's no way to configure bulk-loading options specifically as a user. So, I'm satisfied. Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
##########
@@ -29,9 +29,12 @@
  * Users desiring stateful operations will need to provide synchronization internally in
  * the {@code StateRestorerListener} implementation.
  *
- * When used for monitoring a single {@link StateStore} using either {@link AbstractNotifyingRestoreCallback} or
- * {@link AbstractNotifyingBatchingRestoreCallback} no synchronization is necessary
- * as each StreamThread has its own StateStore instance.
+ * Note that this listener is only registered at the per-client level and users can base on the {@code storeName}

Review comment:
       Thanks!

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##########
@@ -210,30 +207,6 @@ public void shouldFindSingleStoreForChangelog() {
         );
     }
 
-    @Test
-    public void shouldRestoreStoreWithRestoreCallback() {

Review comment:
       Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org