You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/21 19:22:18 UTC

[GitHub] [kafka] kirktrue commented on a diff in pull request #12672: KAFKA-14252: Consumer refactor background thread

kirktrue commented on code in PR #12672:
URL: https://github.com/apache/kafka/pull/12672#discussion_r976889155


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code ApplicationEvent} from the {@link org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements AutoCloseable {
+    private static final String CONSUMER_BACKGROUND_THREAD_PREFIX = "consumer_background_thread";
+
+    private final BlockingQueue<ConsumerRequestEvent> consumerRequestEvent;
+    private final BlockingQueue<ConsumerResponseEvent> consumerResponseEvent;
+    private Time time;

Review Comment:
   Nit-picky, but this can and probably should be `final` too.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code ApplicationEvent} from the {@link org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements AutoCloseable {

Review Comment:
   IIRC, `AutoCloseable` is usually for cases of "try-with-resources." Is that how it will be used?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code ApplicationEvent} from the {@link org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements AutoCloseable {
+    private static final String CONSUMER_BACKGROUND_THREAD_PREFIX = "consumer_background_thread";
+
+    private final BlockingQueue<ConsumerRequestEvent> consumerRequestEvent;
+    private final BlockingQueue<ConsumerResponseEvent> consumerResponseEvent;
+    private Time time;
+
+    // control variables
+    private volatile boolean closed = false;
+
+    public ConsumerBackgroundThread(BlockingQueue<ConsumerRequestEvent> consumerRequestEvents,

Review Comment:
   The constuctor logic seems a bit reversed from what I'd expect. 
   
   I would imagine the three-arg version would be be:
   
   ```
   super(CONSUMER_BACKGROUND_THREAD_PREFIX, true);
   this.time = time;
   this.consumerRequestEvent = consumerRequestEvents;
   this.consumerResponseEvent = consumerResponseEvents;
   ```
   
   And then the two-arg version simply be:
   
   ```
   this(Time.SYSTEM, consumerRequestEvents, consumerResponseEvents);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class interfaces the KafkaConsumer and the background thread.  It allows the caller to enqueue {@link ApplicationEvent}
+ * to be consumed by the background thread and poll {@linkBackgroundEvent} produced by the background thread.
+ */
+public class DefaultEventHandler implements EventHandler {
+    private BlockingQueue<ApplicationEvent> applicationEvents;
+    private BlockingQueue<BackgroundEvent> backgroundEvents;
+
+    public DefaultEventHandler() {
+        this.applicationEvents = new LinkedBlockingQueue<>();
+        this.backgroundEvents = new LinkedBlockingQueue<>();
+        // TODO: a concreted implementation of how requests are being consumed, and how responses are being produced.
+    }
+
+    @Override
+    public BackgroundEvent poll() {
+        return backgroundEvents.poll();

Review Comment:
   This can return `null`, right? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class interfaces the KafkaConsumer and the background thread.  It allows the caller to enqueue {@link ApplicationEvent}
+ * to be consumed by the background thread and poll {@linkBackgroundEvent} produced by the background thread.
+ */
+public class DefaultEventHandler implements EventHandler {
+    private BlockingQueue<ApplicationEvent> applicationEvents;
+    private BlockingQueue<BackgroundEvent> backgroundEvents;
+
+    public DefaultEventHandler() {
+        this.applicationEvents = new LinkedBlockingQueue<>();
+        this.backgroundEvents = new LinkedBlockingQueue<>();
+        // TODO: a concreted implementation of how requests are being consumed, and how responses are being produced.
+    }
+
+    @Override
+    public BackgroundEvent poll() {
+        return backgroundEvents.poll();
+    }
+
+    @Override
+    public boolean add(ApplicationEvent event) {
+        return applicationEvents.add(event);

Review Comment:
   According to the [JavaDoc for `AbstractQueue`](https://docs.oracle.com/javase/8/docs/api/java/util/AbstractQueue.html#add-E-):
   
   > This implementation returns `true` if `offer` (sic) succeeds, else throws an `IllegalStateException`.
   
   Do we want an exception thrown if the buffer is full? Or do we want the [`offer`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#offer-E-) method which returns `false`?
   
   > Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning `true` upon success and `false` if this queue is full. When using a capacity-restricted queue, this method is generally preferable to method [`add`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html#add-E-), which can fail to insert an element only by throwing an exception.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerBackgroundThread.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ConsumerRequestEvent;
+import org.apache.kafka.clients.consumer.internals.events.ConsumerResponseEvent;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The background thread runs in the background and consumes the {@code ApplicationEvent} from the {@link org.apache.kafka.clients.consumer.KafkaConsumer} APIs. This class uses an event loop to drive the following important tasks:
+ * <ul>
+ *  <li>Consuming and executing the {@code ApplicationEvent}.</li>
+ *  <li>Maintaining the connection to the coordinator.</li>
+ *  <li>Sending heartbeat.</li>
+ *  <li>Autocommitting.</li>
+ *  <li>Executing the rebalance flow.</li>
+ * </ul>
+ */
+public class ConsumerBackgroundThread extends KafkaThread implements AutoCloseable {
+    private static final String CONSUMER_BACKGROUND_THREAD_PREFIX = "consumer_background_thread";
+
+    private final BlockingQueue<ConsumerRequestEvent> consumerRequestEvent;
+    private final BlockingQueue<ConsumerResponseEvent> consumerResponseEvent;
+    private Time time;
+
+    // control variables
+    private volatile boolean closed = false;
+
+    public ConsumerBackgroundThread(BlockingQueue<ConsumerRequestEvent> consumerRequestEvents,
+                                    BlockingQueue<ConsumerResponseEvent> consumerResponseEvents) {
+        super(CONSUMER_BACKGROUND_THREAD_PREFIX, true);
+        this.time = Time.SYSTEM;
+        this.consumerRequestEvent = consumerRequestEvents;
+        this.consumerResponseEvent = consumerResponseEvents;
+    }
+
+    public ConsumerBackgroundThread(Time time,
+                                    BlockingQueue<ConsumerRequestEvent> consumerRequestEvents,
+                                    BlockingQueue<ConsumerResponseEvent> consumerResponseEvents) {
+        this(consumerRequestEvents, consumerResponseEvents);
+        this.time = time;
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(!closed) {
+                // TODO: implementation will be added here

Review Comment:
   Magic happens here... 😄 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This class interfaces the KafkaConsumer and the background thread.  It allows the caller to enqueue {@link ApplicationEvent}
+ * to be consumed by the background thread and poll {@linkBackgroundEvent} produced by the background thread.
+ */
+public class DefaultEventHandler implements EventHandler {
+    private BlockingQueue<ApplicationEvent> applicationEvents;

Review Comment:
   These two `BlockingQueue`s can be `final`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/EventHandler.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals.events;
+
+/**
+ * This class interfaces with the KafkaConsumer and the background thread. It allows the caller to enqueue events via
+ * the {@code add()} method and to retrieve events via the {@code poll()} method.
+ */
+public interface EventHandler {
+    public BackgroundEvent poll();
+    public boolean add(ApplicationEvent event);

Review Comment:
   Can you describe that the `boolean` that is returned from `add` represents?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org