You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/16 12:28:08 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #8876: KAFKA-10167: use the admin client to read end-offset 【WIP】

cadonna commented on a change in pull request #8876:
URL: https://github.com/apache/kafka/pull/8876#discussion_r440801026



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -369,6 +369,7 @@ public static StreamThread create(final InternalTopologyBuilder builder,
 
         final Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs);
         changelogReader.setMainConsumer(mainConsumer);
+        changelogReader.setAdminClient(adminClient);

Review comment:
       Q: Why do you not pass the admin client in the constructor? 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -273,13 +273,13 @@ public boolean isRunning() {
     private volatile ThreadMetadata threadMetadata;
     private StreamThread.StateListener stateListener;
 
-    private final Admin adminClient;
     private final ChangelogReader changelogReader;
 
     // package-private for testing
     final ConsumerRebalanceListener rebalanceListener;
     final Consumer<byte[], byte[]> mainConsumer;
     final Consumer<byte[], byte[]> restoreConsumer;
+    final Admin adminClient;

Review comment:
       You mean `private`, right? It is not `private` anymore.
   Instead of making the `adminClient` field package private for testing, I would either add a setter `setAdmin()` to `MockClientSupplier` or instantiate the admin in a private field of the `MockClientSupplier` and use the existing getter to set up the admin, or instantiate the admin in a public field of the `MockClientSupplier` and accessing it directly to set it up (similarly to the producer and consumer).   

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -564,8 +576,15 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
             return Collections.emptyMap();
 
         try {
-            return restoreConsumer.endOffsets(partitions);
-        } catch (final TimeoutException e) {
+            if (adminClient != null) {

Review comment:
       I also do not understand the distinction. When would we want to call `restoreConsumer.endOffsets(partitions)`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -564,8 +576,15 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
             return Collections.emptyMap();
 
         try {
-            return restoreConsumer.endOffsets(partitions);
-        } catch (final TimeoutException e) {
+            if (adminClient != null) {

Review comment:
       req: Could you add a test (or adapt an existing one) and verify whether during `restore()` a call to `adminClient.listOffsets()` with isolation level `READ_UNCOMMITTED` is done? 

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##########
@@ -1634,6 +1634,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t
         final StreamThread thread = createStreamThread("clientId", config, false);
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer;
         final MockConsumer<byte[], byte[]> mockRestoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer;
+        final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient;

Review comment:
       See my comment in `StreamThread`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org