You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/19 21:29:40 UTC

[GitHub] [kafka] philipnee opened a new pull request, #12663: [Consumer Refactor] Define event handler interface and events

philipnee opened a new pull request, #12663:
URL: https://github.com/apache/kafka/pull/12663

   https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
   
   * We want to abstract away the Queue from the new KafkaConsumer using an EventHandler.
   - poll: Retrieve a response from the eventHandler
   - add: Add a request to the handler
   The event handling should live inside of the concrete implementation
   
   *Unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975641608


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;

Review Comment:
   Sounds good, I was actually thinking about the same thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r976872471


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via
+ * the {@code add()} method and to retrieve events via the {@code poll()} method.
+ */
+public interface EventHandler {
+    public BackgroundEvent poll();

Review Comment:
   Can we add some javadoc for this? Does this return null if there are no events available?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r982803945


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPrototypeAsyncConsumer.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is the prototype of the consumer that uses the {@link EventHandler} to process application events.
+ */
+public abstract class AbstractPrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
+    private final EventHandler eventHandler;
+    private final Time time;
+
+    public AbstractPrototypeAsyncConsumer(final Time time, final EventHandler eventHandler) {
+        this.eventHandler = eventHandler;
+        this.time = time;
+    }
+
+    /**
+     * poll implementation using {@link EventHandler}.
+     *  1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
+     *  another type of event, process it.
+     *  2. Send fetches if needed.
+     *  If the timeout expires, return an empty ConsumerRecord.
+     * @param timeout timeout of the poll loop
+     * @return ConsumerRecord.  It can be empty if time timeout expires.
+     */
+    @Override
+    public ConsumerRecords<K, V> poll(final Duration timeout) {
+        try {
+            do {
+                if (!eventHandler.isEmpty()) {
+                    Optional<BackgroundEvent> backgroundEvent = eventHandler.poll();
+                    if (backgroundEvent.isPresent()) {
+                        if (isFetchResult(backgroundEvent.get())) {
+                            // return fetches
+                            return processFetchResult(backgroundEvent.get());
+                        }
+                        processEvent(backgroundEvent.get(), timeout); // might trigger callbacks or handle exceptions
+                    }
+                }
+
+                maybeSendFetches(); // send new fetches

Review Comment:
   the intention was to enqueue a fetch event, I think we still want the poll loop to drive the fetching, right? Maybe a better naming is needed.  `enqueueFetchEvent()` perhaps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r984957644


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPrototypeAsyncConsumer.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is the prototype of the consumer that uses the {@link EventHandler} to process application events.

Review Comment:
   I think the main thing this doc should mention is that all network IO will be done in a background thread instead of being partially driven by the application calling `poll()`. Also, perhaps it would be helpful to link to the wiki so that people can understand a little better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975863614


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   Probably not, events like assignment don't really require a response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r976908482


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via
+ * the {@code add()} method and to retrieve events via the {@code poll()} method.
+ */
+public interface EventHandler {
+    public BackgroundEvent poll();

Review Comment:
   Thanks @hachikuji - I'm making it to return an Optional.  Another question raised here, I think we need to handle the exceptions thrown by blockingQueue.add.  I'm looking at the RaftMessageQueue impl, the exceptions aren't being handled there, are we expecting the caller/client to handle these exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on PR #12663:
URL: https://github.com/apache/kafka/pull/12663#issuecomment-1251684468

   cc @guozhangwang @hachikuji @dajac for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975965989


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
   I was thinking about `ConsumerEvent` or `CallerEvent`, and `CoordinatorEvent`, just to make the naming party worse :P 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975642128


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   I'll add java docs around it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r982820685


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPrototypeAsyncConsumer.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is the prototype of the consumer that uses the {@link EventHandler} to process application events.
+ */
+public abstract class AbstractPrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
+    private final EventHandler eventHandler;
+    private final Time time;
+
+    public AbstractPrototypeAsyncConsumer(final Time time, final EventHandler eventHandler) {
+        this.eventHandler = eventHandler;
+        this.time = time;
+    }
+
+    /**
+     * poll implementation using {@link EventHandler}.
+     *  1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
+     *  another type of event, process it.
+     *  2. Send fetches if needed.
+     *  If the timeout expires, return an empty ConsumerRecord.
+     * @param timeout timeout of the poll loop
+     * @return ConsumerRecord.  It can be empty if time timeout expires.
+     */
+    @Override
+    public ConsumerRecords<K, V> poll(final Duration timeout) {
+        try {
+            do {
+                if (!eventHandler.isEmpty()) {
+                    Optional<BackgroundEvent> backgroundEvent = eventHandler.poll();
+                    if (backgroundEvent.isPresent()) {
+                        if (isFetchResult(backgroundEvent.get())) {
+                            // return fetches
+                            return processFetchResult(backgroundEvent.get());
+                        }
+                        processEvent(backgroundEvent.get(), timeout); // might trigger callbacks or handle exceptions
+                    }
+                }
+
+                maybeSendFetches(); // send new fetches

Review Comment:
   Thanks, @hachikuji - removed this line per your comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r984959163


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPrototypeAsyncConsumer.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is the prototype of the consumer that uses the {@link EventHandler} to process application events.
+ */
+public abstract class AbstractPrototypeAsyncConsumer<K, V> implements Consumer<K, V> {

Review Comment:
   Thanks, let's drop the Abstract prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12663:
URL: https://github.com/apache/kafka/pull/12663#issuecomment-1253989824

   Thanks @philipnee , I think we can merge the PR as is since it's new interfaces and if we found anything in follow-up developments we can always come back and change them. So LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975915821


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
   That could work. Maybe `ApplicationEvent` and `BackgroundEvent`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975715686


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
   From our offline discussions, I think there are two channels of communicating "responses" back: 1) the caller send a request, along with a future, and then either sync or async waiting on the future, in this case it does not expect "polling" the response; 2) the caller does not have a target response to wait in mind, and would just want to poll for any responses that may become available and react on them.
   
   For commitSync, for example, since the caller only cares about a specific response, it may be not appropriate to try polling responses since there will be a lot of other responses getting polled but the caller does not care, hence it would likely falling into the first case above.
   
   Could we add some javadocs clarifying which scenarios will leverage on each of the two?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975863343


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
   Absolutely.  I'm also thinking that the naming can be a bit misleading sometimes, because the ResponseEvents might not necessary be a "response".  As far as the implementation goes, I think they are mostly just errors and callback events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975715686


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
   From our offline discussions, I think there are two channels of communicating "responses" back: 1) the caller send a request, along with a future, and then either sync or async waiting on the future, in this case it does not expect "polling" the response; 2) the caller send a request, and then either sync or async polling responses.
   
   For commitSync, for example, since the caller only cares about a specific response, it may be not appropriate to try polling responses since there will be a lot of other responses getting polled but the caller does not care.
   
   Could we add some javadocs clarifying which scenarios will leverage on each of the two?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   Per my other comment above: for request, do we also want to always include a future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji merged PR #12663:
URL: https://github.com/apache/kafka/pull/12663


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r982794220


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPrototypeAsyncConsumer.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is the prototype of the consumer that uses the {@link EventHandler} to process application events.
+ */
+public abstract class AbstractPrototypeAsyncConsumer<K, V> implements Consumer<K, V> {
+    private final EventHandler eventHandler;
+    private final Time time;
+
+    public AbstractPrototypeAsyncConsumer(final Time time, final EventHandler eventHandler) {
+        this.eventHandler = eventHandler;
+        this.time = time;
+    }
+
+    /**
+     * poll implementation using {@link EventHandler}.
+     *  1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
+     *  another type of event, process it.
+     *  2. Send fetches if needed.
+     *  If the timeout expires, return an empty ConsumerRecord.
+     * @param timeout timeout of the poll loop
+     * @return ConsumerRecord.  It can be empty if time timeout expires.
+     */
+    @Override
+    public ConsumerRecords<K, V> poll(final Duration timeout) {
+        try {
+            do {
+                if (!eventHandler.isEmpty()) {
+                    Optional<BackgroundEvent> backgroundEvent = eventHandler.poll();
+                    if (backgroundEvent.isPresent()) {
+                        if (isFetchResult(backgroundEvent.get())) {
+                            // return fetches
+                            return processFetchResult(backgroundEvent.get());
+                        }
+                        processEvent(backgroundEvent.get(), timeout); // might trigger callbacks or handle exceptions
+                    }
+                }
+
+                maybeSendFetches(); // send new fetches

Review Comment:
   I would think this would be handled by the background thread. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975638603


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;

Review Comment:
   Would it make sense for the handlers to go in `internals/events` as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {

Review Comment:
   I wonder if the generics are needed here. Do we anticipate other types?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ConsumerRequestEvent.java:
##########
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+abstract public class ConsumerRequestEvent {

Review Comment:
   Can you clarify what qualifies as a "request event" vs "response event"? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r984958755


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPrototypeAsyncConsumer.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is the prototype of the consumer that uses the {@link EventHandler} to process application events.
+ */
+public abstract class AbstractPrototypeAsyncConsumer<K, V> implements Consumer<K, V> {

Review Comment:
   nit: how about we call it `PrototypeBackgroundConsumer` or something like that? Or at least drop the `Abstract` from the name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975897905


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
    Can we be more direct and call the ResponseEvent the BackgroundThreadEvent... because it really is coming from the background thread, and it's not necessary a NetworkEvent.  It can be just a trigger for the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on PR #12663:
URL: https://github.com/apache/kafka/pull/12663#issuecomment-1253980979

   Thanks @hachikuji and @guozhangwang - I just updated the PR.  I'm settling with `ApplicationEvent` and `BackgroundEvent`.  Also, brief definitions for the abstract classes have been added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r980468305


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import java.util.Optional;
+
+/**
+ * This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via
+ * the {@code add()} method and to retrieve events via the {@code poll()} method.
+ */
+public interface EventHandler {

Review Comment:
   I don't think we need another handler on the background thread, what I'm thinking is to have the background thread living inside of the handler implementation, process the application events, and produce the background events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975641032


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {

Review Comment:
   That's a good point, I think I was trying to make it a generic handler but we don't really need it at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r975893945


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/EventHandler.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * The EventHandler interfaces the client and handler implementation.  The client should add an event for consumption,
+ * and try to poll for the responses.  Here, one could poll a response with type K, and add a request with type T.
+ * @param <T> Event request type
+ * @param <K> Event response type
+ */
+public interface EventHandler<T, K> {
+    public K poll();
+    public boolean add(T event);

Review Comment:
   I was thinking something like RequestEvent -> ApplicationEvent and ResponseEvent -> NetworkEvent. Or something like that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #12663: [Consumer Refactor] Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
philipnee commented on PR #12663:
URL: https://github.com/apache/kafka/pull/12663#issuecomment-1254001382

   Thanks @guozhangwang! Agreed, we can make incremental changes to these implementation as we make more progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12663: KAFKA-14247: Define event handler interface and events

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12663:
URL: https://github.com/apache/kafka/pull/12663#discussion_r980464181


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+import java.util.Optional;
+
+/**
+ * This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via
+ * the {@code add()} method and to retrieve events via the {@code poll()} method.
+ */
+public interface EventHandler {

Review Comment:
   I guess this is for the application side since we add application events and poll background events. Are we going to need something similar for the background thread as well? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org