You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "philipnee (via GitHub)" <gi...@apache.org> on 2023/02/24 23:31:54 UTC

[GitHub] [kafka] philipnee opened a new pull request, #13303: KAFKA-14761 Adding integration test for the prototype consumer

philipnee opened a new pull request, #13303:
URL: https://github.com/apache/kafka/pull/13303

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1125607022


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));
+            } else {
+                callback.onComplete(offsets, null);
+            }
         });
+    }
+
+    private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   I have a KIP proposal here: 
   1. Can we introduce a commit interface to return a CompletableFuture?
   2. Do we want to introduce a poll interface returning a CompletableFuture? (Like do we really need it to block?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127252457


##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+    val consumer = createAsyncConsumer()
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
+    consumer.commitAsync(cb)
+    waitUntilTrue(() => {
+      cb.successCount == 1
+    }, "wait until commit is completed successfully", 5000)
+    consumer.commitSync();

Review Comment:
   +1 : although i don't know how to verify that but let me look into it. thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1455139054

   Tests appear to be flaky:
   ```
   Build / JDK 11 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
   2m 24s
   Build / JDK 11 and Scala 2.13 / testElectUncleanLeadersForManyPartitions(String).quorum=kraft – kafka.api.PlaintextAdminIntegrationTest
   22s
   Build / JDK 17 and Scala 2.13 / shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions() – org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest
   1m 43s
   Build / JDK 17 and Scala 2.13 / shouldRestoreState() – org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   1m 4s
   Fixed 126
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1458974950

   @guozhangwang - Regarding testing the commit results, can we defer this to the subsequent PR, i.e. verify it once committed() is implemented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1457356989

   re. @guozhangwang - sorry, I think originally I added 2 tests, then I reduced to 1. Thanks for catching this.
   
   For testing poll() - The fetcher isn't available but I could try to test poll?
   
   Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13303:
URL: https://github.com/apache/kafka/pull/13303


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1128444212


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));
+            } else {
+                callback.onComplete(offsets, null);
+            }
         });
+    }
+
+    private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
     }
 
     @Test
     public void testUnimplementedException() {
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
         assertThrows(KafkaException.class, consumer::assignment, "not implemented exception");
     }
 
-    public PrototypeAsyncConsumer<String, String> setupConsumerWithDefault() {
-        ConsumerConfig config = new ConsumerConfig(properties);
-        return new PrototypeAsyncConsumer<>(
-                this.time,
-                this.logContext,
-                config,
-                this.subscriptionState,
-                this.eventHandler,
-                this.metrics,
-                this.clusterResourceListeners,
-                this.groupId,
-                this.clientId,
-                0);
+    private ConsumerMetadata createMetadata(SubscriptionState subscription) {
+        return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+                subscription, new LogContext(), new ClusterResourceListeners());
+    }
+
+    private void injectConsumerConfigs() {
+        consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

Review Comment:
   For the longer term, it will be good to use a MockClient rather than one that attempts to connect and fills the log with exceptions. For now, this seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1458731526

   Hey @rajinisivaram - Thanks! To your comments
   `I thought we have those methods on the existing Consumer interface. Anyway, that doesn't impact this PR.` : The current consumer uses kafka specific future, but in the new re-write, we are kind of migrating to java CompletableFuture
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127185285


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java:
##########
@@ -213,7 +215,7 @@ public UnsentRequest(
             Objects.requireNonNull(requestBuilder);
             this.requestBuilder = requestBuilder;
             this.node = node;
-            this.callback = new FutureCompletionHandler();

Review Comment:
   nit: just call the member field `handler` as well?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());

Review Comment:
   +1



##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+    val consumer = createAsyncConsumer()
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
+    consumer.commitAsync(cb)
+    waitUntilTrue(() => {
+      cb.successCount == 1
+    }, "wait until commit is completed successfully", 5000)
+    consumer.commitSync();

Review Comment:
   Why call `commitSync` here again? If the goal is to make sure the second commit also goes through shall we verify that on the broker's side?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127043244


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));

Review Comment:
   You are right, it should've been a KafkaException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127253922


##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+    val consumer = createAsyncConsumer()
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
+    consumer.commitAsync(cb)
+    waitUntilTrue(() => {
+      cb.successCount == 1
+    }, "wait until commit is completed successfully", 5000)
+    consumer.commitSync();

Review Comment:
   Maybe I'll break it up into two tests there.  It doesn't really make sense to call commit twice in a row.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127253699


##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+    val consumer = createAsyncConsumer()
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
+    consumer.commitAsync(cb)
+    waitUntilTrue(() => {
+      cb.successCount == 1
+    }, "wait until commit is completed successfully", 5000)
+    consumer.commitSync();

Review Comment:
   I think the idea was to test both commitSync and commitAsync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1125606496


##########
core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala:
##########
@@ -97,7 +97,7 @@ abstract class AbstractConsumerTest extends BaseRequestTest {
     }
   }
 
-  protected def createConsumerWithGroupId(groupId: String): KafkaConsumer[Array[Byte], Array[Byte]] = {

Review Comment:
   The only thing here was to change the type of Consumer<K, V>



##########
core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala:
##########
@@ -306,7 +306,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     val partitions = createTopicPartitions(topic, numPartitions = partitionCount, replicationFactor = brokerCount)
 
-    addConsumersToGroupAndWaitForGroupAssignment(consumerCount, mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](),

Review Comment:
   Same, change to Consumer<K, V>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1125606614


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -993,7 +993,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // subscribe all consumers to all topics and validate the assignment
 
-    val consumersInGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()

Review Comment:
   Same, make it Consumer<K, V>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127046532


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -83,12 +92,23 @@
     private final Metrics metrics;
     private final long defaultApiTimeoutMs;
 
+    public PrototypeAsyncConsumer(Properties properties,
+                         Deserializer<K> keyDeserializer,
+                         Deserializer<V> valueDeserializer) {
+        this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
+    }
+
+    public PrototypeAsyncConsumer(final Map<String, Object> configs,
+                                  final Deserializer<K> keyDeser,
+                                  final Deserializer<V> valDeser) {
+        this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser,
+                valDeser);
+    }
     @SuppressWarnings("unchecked")
-    public PrototypeAsyncConsumer(final Time time,
-                                  final ConsumerConfig config,
+    public PrototypeAsyncConsumer(final ConsumerConfig config,
                                   final Deserializer<K> keyDeserializer,
                                   final Deserializer<V> valueDeserializer) {
-        this.time = time;
+        this.time = Time.SYSTEM;

Review Comment:
   Thanks for spotting this. I think the unit test consumer is meant to be the one below this. I think I should remove the time and make the constructor param list the same as the current one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1457022169

   Thanks for reviewing this, @rajinisivaram - fixes are on the way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127264709


##########
core/src/test/scala/integration/kafka/api/BaseAsyncConsumerTest.scala:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils.waitUntilTrue
+import org.junit.jupiter.api.Test
+
+class BaseAsyncConsumerTest extends AbstractConsumerTest {
+  @Test
+  def testCommitAPI(): Unit = {
+    val consumer = createAsyncConsumer()
+    val producer = createProducer()
+    val numRecords = 10000
+    val startingTimestamp = System.currentTimeMillis()
+    val cb = new CountConsumerCommitCallback
+    sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp)
+    consumer.commitAsync(cb)
+    waitUntilTrue(() => {
+      cb.successCount == 1
+    }, "wait until commit is completed successfully", 5000)
+    consumer.commitSync();

Review Comment:
   Other integration tests test the commit with committed method call. I'll leave this as TODO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1125607022


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));
+            } else {
+                callback.onComplete(offsets, null);
+            }
         });
+    }
+
+    private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   I have a KIP proposal here: 
   1. WDYT about asyncCommit returning a CompletableFuture?
   2. WDYT about poll returning CompletableFuture?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127256908


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));
+            } else {
+                callback.onComplete(offsets, null);
+            }
         });
+    }
+
+    private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   Thanks, for 1 - I think I actually wanted to say commitSync(), sorry about that.
   
   I think they aren't part of the existing interface, right? Currently, both commitSync and poll don't return a future, and I was wondering if it makes sense for us to do that.  Also, it makes the interface feels more modern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1128780569


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -83,12 +92,23 @@
     private final Metrics metrics;
     private final long defaultApiTimeoutMs;
 
+    public PrototypeAsyncConsumer(Properties properties,
+                         Deserializer<K> keyDeserializer,
+                         Deserializer<V> valueDeserializer) {
+        this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
+    }
+
+    public PrototypeAsyncConsumer(final Map<String, Object> configs,
+                                  final Deserializer<K> keyDeser,
+                                  final Deserializer<V> valDeser) {
+        this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser,
+                valDeser);
+    }
     @SuppressWarnings("unchecked")
-    public PrototypeAsyncConsumer(final Time time,
-                                  final ConsumerConfig config,
+    public PrototypeAsyncConsumer(final ConsumerConfig config,
                                   final Deserializer<K> keyDeserializer,
                                   final Deserializer<V> valueDeserializer) {
-        this.time = time;
+        this.time = Time.SYSTEM;

Review Comment:
   I think as we go along with this, we would need to add this param back to the constructor :) ANyways that's for future PRs then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "rajinisivaram (via GitHub)" <gi...@apache.org>.
rajinisivaram commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1126983650


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -83,12 +92,23 @@
     private final Metrics metrics;
     private final long defaultApiTimeoutMs;
 
+    public PrototypeAsyncConsumer(Properties properties,
+                         Deserializer<K> keyDeserializer,
+                         Deserializer<V> valueDeserializer) {
+        this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
+    }
+
+    public PrototypeAsyncConsumer(final Map<String, Object> configs,
+                                  final Deserializer<K> keyDeser,
+                                  final Deserializer<V> valDeser) {
+        this(new ConsumerConfig(appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser,
+                valDeser);
+    }
     @SuppressWarnings("unchecked")
-    public PrototypeAsyncConsumer(final Time time,
-                                  final ConsumerConfig config,
+    public PrototypeAsyncConsumer(final ConsumerConfig config,
                                   final Deserializer<K> keyDeserializer,
                                   final Deserializer<V> valueDeserializer) {
-        this.time = time;
+        this.time = Time.SYSTEM;

Review Comment:
   Why did we remove `time`? Could be useful for unit testing?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
     }
 
     @Test
     public void testUnimplementedException() {
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
         assertThrows(KafkaException.class, consumer::assignment, "not implemented exception");
     }
 
-    public PrototypeAsyncConsumer<String, String> setupConsumerWithDefault() {
-        ConsumerConfig config = new ConsumerConfig(properties);
-        return new PrototypeAsyncConsumer<>(
-                this.time,
-                this.logContext,
-                config,
-                this.subscriptionState,
-                this.eventHandler,
-                this.metrics,
-                this.clusterResourceListeners,
-                this.groupId,
-                this.clientId,
-                0);
+    private ConsumerMetadata createMetadata(SubscriptionState subscription) {
+        return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+                subscription, new LogContext(), new ClusterResourceListeners());
+    }
+
+    private void injectConsumerConfigs() {
+        consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

Review Comment:
   I guess this test currently attempts to connect to this? Is the intention to convert this into a unit test with mocks or only use it for tests where successful connections are not required?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());

Review Comment:
   Don't we need to add some verification since the test name suggests we are testing if the background thread was started?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));

Review Comment:
   Any reason why this is a RuntimeException rather than a KafkaException?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -226,11 +246,20 @@ public void commitAsync(OffsetCommitCallback callback) {
 
     @Override
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        final CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
-        commitEvent.future().whenComplete((r, t) -> {
-            callback.onComplete(offsets, new RuntimeException(t));
+        CompletableFuture<Void> future = commit(offsets);
+        future.whenComplete((r, t) -> {
+            if (t != null) {
+                callback.onComplete(offsets, new RuntimeException(t));
+            } else {
+                callback.onComplete(offsets, null);
+            }
         });
+    }
+
+    private CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   Aren't these methods part of the existing Consumer interface? Won't they be breaking changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127250227


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());

Review Comment:
   I'm thinking what's the best way to test that. As we don't have direct access to the background thread from the consumer.  I guess we could do that passively by trying to catch the exception, or maybe, actively, I should add a `public State state()` to the eventHandler?  to allow user to prob the background thread stat... I feel we need both, but WDYT? @rajinisivaram @guozhangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127251663


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());

Review Comment:
   maybe we don't need this afterall actually. I feel for the purpose of unit test, we should just try to test the API and method calls. The integration test in this PR is already kind of testing this (otherwise request won't come through)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13303:
URL: https://github.com/apache/kafka/pull/13303#discussion_r1127257845


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java:
##########
@@ -16,90 +16,77 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.consumer.internals.events.EventHandler;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 
-import static java.util.Collections.singleton;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class PrototypeAsyncConsumerTest {
-    private Map<String, Object> properties;
-    private SubscriptionState subscriptionState;
-    private MockTime time;
-    private LogContext logContext;
-    private Metrics metrics;
-    private ClusterResourceListeners clusterResourceListeners;
-    private Optional<String> groupId;
-    private String clientId;
-    private EventHandler eventHandler;
+
+    private Consumer<?, ?> consumer;
+    private Map<String, Object> consumerProps = new HashMap<>();
+
+    private final Time time = new MockTime();
 
     @BeforeEach
     public void setup() {
-        this.subscriptionState = Mockito.mock(SubscriptionState.class);
-        this.eventHandler = Mockito.mock(DefaultEventHandler.class);
-        this.logContext = new LogContext();
-        this.time = new MockTime();
-        this.metrics = new Metrics(time);
-        this.groupId = Optional.empty();
-        this.clientId = "client-1";
-        this.clusterResourceListeners = new ClusterResourceListeners();
-        this.properties = new HashMap<>();
-        this.properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                "localhost" +
-                ":9999");
-        this.properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        this.properties.put(CLIENT_ID_CONFIG, "test-client");
+        injectConsumerConfigs();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        if (consumer != null) {
+            consumer.close(Duration.ZERO);
+        }
     }
+
     @Test
-    public void testSubscription() {
-        this.subscriptionState =
-                new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
-        subscriptionState.subscribe(singleton("t1"),
-                new NoOpConsumerRebalanceListener());
-        assertEquals(1, consumer.subscription().size());
+    public void testBackgroundThreadRunning() {
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
     }
 
     @Test
     public void testUnimplementedException() {
-        PrototypeAsyncConsumer<String, String> consumer =
-                setupConsumerWithDefault();
+        consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer());
         assertThrows(KafkaException.class, consumer::assignment, "not implemented exception");
     }
 
-    public PrototypeAsyncConsumer<String, String> setupConsumerWithDefault() {
-        ConsumerConfig config = new ConsumerConfig(properties);
-        return new PrototypeAsyncConsumer<>(
-                this.time,
-                this.logContext,
-                config,
-                this.subscriptionState,
-                this.eventHandler,
-                this.metrics,
-                this.clusterResourceListeners,
-                this.groupId,
-                this.clientId,
-                0);
+    private ConsumerMetadata createMetadata(SubscriptionState subscription) {
+        return new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+                subscription, new LogContext(), new ClusterResourceListeners());
+    }
+
+    private void injectConsumerConfigs() {
+        consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");

Review Comment:
   I think that's the intent: I don't think it is necessary to make the actual connection to the broker on the level of unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13303: KAFKA-14761 Adding integration test for the prototype consumer

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13303:
URL: https://github.com/apache/kafka/pull/13303#issuecomment-1459067328

   > @guozhangwang - Regarding testing the commit results, can we defer this to the subsequent PR, i.e. verify it once committed() is implemented.
   
   SGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org