You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/22 19:01:05 UTC

[GitHub] [kafka] rhauch opened a new pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

rhauch opened a new pull request #9780:
URL: https://github.com/apache/kafka/pull/9780


   The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic.
   
   However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end.
   
   Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity.
   
   Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance.
   
   The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes.
   
   These changes are implemented as follows:
   1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets.
   2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance.
   3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed.
   4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic.
   5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.)
   6. Change `MirrorMaker` similarly to `ConnectDistributed`.
   7. Change existing unit tests to no longer use deprecated constructors.
   8. Add unit tests for new functionality.
   
   This change should be backported to fix the bug in recent releases.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #9780:
URL: https://github.com/apache/kafka/pull/9780


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570655008



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       Really I'm just using that to be able to test that the new `topicAdmin()` method is returning the correct instance, even after repeated calls. It was an easy way to verify that the `TopicAdmin` matches what the factory function returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570584036



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time,
-                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                      AutoCloseable... uponShutdown) {

Review comment:
       see comment above

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be null
+     * @param configBackingStore the backing store for connector configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
+     *                                            in connector configurations; may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       I think it's better to avoid a variadic argument here. 
   Parameters tend to get added with new features in such constructors. And if a new parameter is required that is also a list, then we'll have a mix of list args with a variadic in the end. 
   
   Since we transform to list I'd suggest using this type here and pass the single argument with `Collections.singletonList` in the caller. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();

Review comment:
       should we call it `EMPTY_CONFIG` since it won't change?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       It's not immediately obvious to me what's the advantage compared to a synchronized `topicAdmin` or even `get`. 
   
   I see that the value can go back to null. But we already have the guard with the `closed` to atomically decide whether this is closed or not. Lmk if I'm missing something, but I feel the points of indirection might be a few more than they have to be here (we have no gains in locking, since we'll need to atomically `updateAndGet` in every get and the advantage vs `synchronized` should be negligible here). 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -676,6 +698,16 @@ public void halt() {
         }
     }
 
+    @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.stream().forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
+        }
+

Review comment:
       nit: extra

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -676,6 +698,16 @@ public void halt() {
         }
     }
 
+    @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.stream().forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));

Review comment:
       I'd guess `stream().forEach` can be simplified with `forEach` only

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -207,14 +209,16 @@ public void setUp() throws Exception {
         metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
+        AutoCloseable uponShutdown = () -> shutdownCalled.countDown();
 
         // Default to the old protocol unless specified otherwise
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
+                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
+                new AutoCloseable[]{uponShutdown});

Review comment:
       see comment above. We'll avoid the array static init too. Tests here but still that's where array lists come handy

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int partitions) {
+        Node[] nodeArray = new Node[numNodes];

Review comment:
       curious, what is the array symbolizing here now?
   We used to have 1 value. Is this ISR nodes? Do we even need to add or remove any? 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       Is there a specific action on the mock we wish or can verify here instead of implicitly using a aux variable for that?
   Replay, expectation and verify should help us verify the action or its absence. I'd have to check closer what such action could be, if there's any. Maybe you can see that more easily. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r572421771



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       Okay, Mockito FTW! I've rewritten the `SharedTopicAdminTest` class to use Mockito instead of PowerMock and EasyMock, and was able to use mocks to assert the correct number of times an admin instance was created and closed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570747258



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       I know. It's just that we already use a mocking framework and we could use something like: 
   
   `EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();`
   
   if we also defined `factory` to be a mock as well. That could allow us to evaluate expectations on the mock more accurately (e.g. with a capture if we had to). But sure, if we need something quick and easy we can go with that. It's just that I noticed a mixed use of mocks with this variable that simulates what the mocking framework offers already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#issuecomment-775505877


   Added another commit that uses Mockito in the new `SharedTopicAdminTest` class to do what @kkonstantine had suggested and which was not possible with EasyMock. 
   
   Mockito FTW!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r572256392



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       I vaguely remember struggling with getting EasyMock to work properly when I originally created this PR, but I tried it again just to be sure. 
   
   Unfortunately, the `SharedTopicAdmin` constructor takes a _method reference_, and apparently EasyMock provides no way to effectively mock the behavior of a method passed via a method reference. See [this still-open EasyMock issue](https://github.com/easymock/easymock/issues/213) for details.
   
   So, while I agree using mocks to verify the # of times the factor is invoked, because EasyMock can't really mock method references means we're stuck with the current code using the counter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570648556



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be null
+     * @param configBackingStore the backing store for connector configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
+     *                                            in connector configurations; may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       The reason I used a variadic array here was to avoid having to create a new connector when no `AutoCloseable` instances are supplied. If we use a List, then we can change the usage in Connect runtime and in MirrorMaker 2, but anywhere else will break without keeping the old signature. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570652791



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int partitions) {
+        Node[] nodeArray = new Node[numNodes];

Review comment:
       Yes, these are just the ISRs for the one partition that we set up the cluster with.
   
   The utility method did allow multiple nodes, but we don't really use that much in this class. I think this changed because we now have to define the `PartitionInfo` instances rather than an empty map. Not sure why that's now different, but supplying the empty infos definitely caused problems in these new tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#issuecomment-749845098


   Rebased on `trunk`, and then had to correct tests based on recent changes in Admin API related classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#issuecomment-775330201


   Added a commit to incorporate @kkonstantine's suggestions (per thumbs-up above), and rebased to correct a merge conflict from `trunk`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#issuecomment-773485918


   Rebased on `trunk` to correct conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570584036



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -209,7 +229,8 @@ public DistributedHerder(DistributedConfig config,
                       String restUrl,
                       ConnectMetrics metrics,
                       Time time,
-                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                      ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                      AutoCloseable... uponShutdown) {

Review comment:
       see comment above

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be null
+     * @param configBackingStore the backing store for connector configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
+     *                                            in connector configurations; may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       I think it's better to avoid a variadic argument here. 
   Parameters tend to get added with new features in such constructors. And if a new parameter is required that is also a list, then we'll have a mix of list args with a variadic in the end. 
   
   Since we transform to list I'd suggest using this type here and pass the single argument with `Collections.singletonList` in the caller. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();

Review comment:
       should we call it `EMPTY_CONFIG` since it won't change?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       It's not immediately obvious to me what's the advantage compared to a synchronized `topicAdmin` or even `get`. 
   
   I see that the value can go back to null. But we already have the guard with the `closed` to atomically decide whether this is closed or not. Lmk if I'm missing something, but I feel the points of indirection might be a few more than they have to be here (we have no gains in locking, since we'll need to atomically `updateAndGet` in every get and the advantage vs `synchronized` should be negligible here). 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -676,6 +698,16 @@ public void halt() {
         }
     }
 
+    @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.stream().forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
+        }
+

Review comment:
       nit: extra

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -676,6 +698,16 @@ public void halt() {
         }
     }
 
+    @Override
+    protected void stopServices() {
+        try {
+            super.stopServices();
+        } finally {
+            this.uponShutdown.stream().forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));

Review comment:
       I'd guess `stream().forEach` can be simplified with `forEach` only

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##########
@@ -207,14 +209,16 @@ public void setUp() throws Exception {
         metrics = new MockConnectMetrics(time);
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
+        AutoCloseable uponShutdown = () -> shutdownCalled.countDown();
 
         // Default to the old protocol unless specified otherwise
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
-                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
+                statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
+                new AutoCloseable[]{uponShutdown});

Review comment:
       see comment above. We'll avoid the array static init too. Tests here but still that's where array lists come handy

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int partitions) {
+        Node[] nodeArray = new Node[numNodes];

Review comment:
       curious, what is the array symbolizing here now?
   We used to have 1 value. Is this ISR nodes? Do we even need to add or remove any? 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       Is there a specific action on the mock we wish or can verify here instead of implicitly using a aux variable for that?
   Replay, expectation and verify should help us verify the action or its absence. I'd have to check closer what such action could be, if there's any. Maybe you can see that more easily. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be null
+     * @param configBackingStore the backing store for connector configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
+     *                                            in connector configurations; may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       We can always keep a constructor with the old signature along with the new if we wanted not to break classes that use `DistributedHerder`. I'm fine with the change here as a short term workaround. I guess it saves us one constructor but we can use it only once.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm happy to leave it as an example of the pattern that demonstrates how to apply `updateAndGet`. 
   
   I just didn't feel that the three or so levels of indirection were worth to write the singleton pattern differently.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm happy to leave it as an example of the pattern that demonstrates how to apply `updateAndGet`. 
   
   I just didn't feel that the two or three levels of indirection were worth to write the singleton pattern differently.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       I know. It's just that we already use a mocking framework and we could use something like: 
   
   `EasyMock.expect(factory.apply(EasyMock.anyObject())).andReturn(mockTopicAdmin).anyTimes();`
   
   if we also defined `factory` to be a mock as well. That could allow us to evaluate expectations on the mock more accurately (e.g. with a capture if we had to). But sure, if we need something quick and easy we can go with that. It's just that I noticed a mixed use of mocks with this variable that simulates what the mocking framework offers already.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570732565



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be null
+     * @param configBackingStore the backing store for connector configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
+     *                                            in connector configurations; may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       We can always keep a constructor with the old signature along with the new if we wanted not to break classes that use `DistributedHerder`. I'm fine with the change here as a short term workaround. I guess it saves us one constructor but we can use it only once.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm happy to leave it as an example of the pattern that demonstrates how to apply `updateAndGet`. 
   
   I just didn't feel that the three or so levels of indirection were worth to write the singleton pattern differently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570728906



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm happy to leave it as an example of the pattern that demonstrates how to apply `updateAndGet`. 
   
   I just didn't feel that the two or three levels of indirection were worth to write the singleton pattern differently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570648556



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -185,16 +188,33 @@
 
     private final DistributedConfig config;
 
+    /**
+     * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
+     * that have the same group ID.
+     *
+     * @param config             the configuration for the worker; may not be null
+     * @param time               the clock to use; may not be null
+     * @param worker             the {@link Worker} instance to use; may not be null
+     * @param kafkaClusterId     the identifier of the Kafka cluster to use for internal topics; may not be null
+     * @param statusBackingStore the backing store for statuses; may not be null
+     * @param configBackingStore the backing store for connector configurations; may not be null
+     * @param restUrl            the URL of this herder's REST API; may not be null
+     * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
+     *                                            in connector configurations; may not be null
+     * @param uponShutdown       any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
+     *                           after all services and resources owned by this herder are stopped
+     */
     public DistributedHerder(DistributedConfig config,
                              Time time,
                              Worker worker,
                              String kafkaClusterId,
                              StatusBackingStore statusBackingStore,
                              ConfigBackingStore configBackingStore,
                              String restUrl,
-                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
+                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
+                             AutoCloseable... uponShutdown) {

Review comment:
       The reason I used a variadic array here was to avoid having to create a new connector when no `AutoCloseable` instances are supplied. If we use a List, then we can change the usage in Connect runtime and in MirrorMaker 2, but anywhere else will break without keeping the old signature. WDYT?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -457,17 +465,273 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("is unsupported on brokers"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithTimeout(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            RetriableException e = assertThrows(RetriableException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Timed out while waiting"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = null; // response should use error
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithUnknownError(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Error while getting end offsets for topic"));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() {
+        String topicName = "myTopic";
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet());
+            assertTrue(offsets.isEmpty());
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForOnePartition() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000L;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(1, offsets.size());
+            assertEquals(Long.valueOf(offset), offsets.get(tp1));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldReturnOffsetsForMultiplePartitions() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        TopicPartition tp2 = new TopicPartition(topicName, 1);
+        Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
+        long offset1 = 1001;
+        long offset2 = 1002;
+        Cluster cluster = createCluster(1, topicName, 2);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset1, tp2, offset2));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
+            assertEquals(2, offsets.size());
+            assertEquals(Long.valueOf(offset1), offsets.get(tp1));
+            assertEquals(Long.valueOf(offset2), offsets.get(tp2));
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        long offset = 1000;
+        Cluster cluster = createCluster(1, topicName, 1);
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResultWithClusterAuthorizationException(tp1, null));
+            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+            ConnectException e = assertThrows(ConnectException.class, () -> {
+                admin.endOffsets(tps);
+            });
+            assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
+        }
+    }
+
     private Cluster createCluster(int numNodes) {
+        return createCluster(numNodes, "unused", 0);
+    }
+
+    private Cluster createCluster(int numNodes, String topicName, int partitions) {
+        Node[] nodeArray = new Node[numNodes];

Review comment:
       Yes, these are just the ISRs for the one partition that we set up the cluster with.
   
   The utility method did allow multiple nodes, but we don't really use that much in this class. I think this changed because we now have to define the `PartitionInfo` instances rather than an empty map. Not sure why that's now different, but supplying the empty infos definitely caused problems in these new tests.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.easymock.EasyMock;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class SharedTopicAdminTest {
+
+    private static final Map<String, Object> CONFIG = Collections.emptyMap();
+
+    @Mock private TopicAdmin mockTopicAdmin;
+    private SharedTopicAdmin sharedAdmin;
+    private int created = 0;

Review comment:
       Really I'm just using that to be able to test that the new `topicAdmin()` method is returning the correct instance, even after repeated calls. It was an easy way to verify that the `TopicAdmin` matches what the factory function returned.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm not sure there is much advantage either way, considering these methods are not called frequently and `synchronized` would indeed work. I personally like the simplicity of using `AtomicReference`, which to me seemed natural and straightforward, avoided having to synchronize the entire methods, and needed no if-checks in this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#discussion_r570657028



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.connect.errors.ConnectException;
+
+/**
+ * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
+ * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
+ * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
+ *
+ * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
+ * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
+ * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
+ * nor any previously returned {@link TopicAdmin} instances may be used.
+ *
+ * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
+ * until this object is closed, at which point it cannot be used anymore
+ */
+public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
+
+    // Visible for testing
+    static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+
+    private final Map<String, Object> adminProps;
+    private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final Function<Map<String, Object>, TopicAdmin> factory;
+
+    public SharedTopicAdmin(Map<String, Object> adminProps) {
+        this(adminProps, TopicAdmin::new);
+    }
+
+    // Visible for testing
+    SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
+        this.adminProps = Objects.requireNonNull(adminProps);
+        this.factory = Objects.requireNonNull(factory);
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    @Override
+    public TopicAdmin get() {
+        return topicAdmin();
+    }
+
+    /**
+     * Get the shared {@link TopicAdmin} instance.
+     *
+     * @return the shared instance; never null
+     * @throws ConnectException if this object has already been closed
+     */
+    public TopicAdmin topicAdmin() {
+        return admin.updateAndGet(this::createAdmin);

Review comment:
       I'm not sure there is much advantage either way, considering these methods are not called frequently and `synchronized` would indeed work. I personally like the simplicity of using `AtomicReference`, which to me seemed natural and straightforward, avoided having to synchronize the entire methods, and needed no if-checks in this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #9780: KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #9780:
URL: https://github.com/apache/kafka/pull/9780#issuecomment-773485918


   Rebased on `trunk` to correct conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org