You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/10 01:59:16 UTC
[kafka] branch 2.6 updated: Revert "KAFKA-10021: Changed Kafka
backing stores to use shared admin client to get end offsets and create
topics (#9780)"
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new a1b9e2b Revert "KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)"
a1b9e2b is described below
commit a1b9e2baba14ec70b300f30a7649691700097f19
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Tue Feb 9 19:56:21 2021 -0600
Revert "KAFKA-10021: Changed Kafka backing stores to use shared admin client to get end offsets and create topics (#9780)"
This reverts commit 36ebed01bb5c104e7b07e384cdab5f480e811d2c.
---
.../apache/kafka/connect/mirror/MirrorMaker.java | 17 +-
.../kafka/connect/cli/ConnectDistributed.java | 18 +-
.../runtime/distributed/DistributedHerder.java | 37 +----
.../connect/storage/KafkaConfigBackingStore.java | 38 ++---
.../connect/storage/KafkaOffsetBackingStore.java | 42 ++---
.../connect/storage/KafkaStatusBackingStore.java | 38 ++---
.../apache/kafka/connect/util/KafkaBasedLog.java | 77 +--------
.../kafka/connect/util/SharedTopicAdmin.java | 145 ----------------
.../org/apache/kafka/connect/util/TopicAdmin.java | 67 --------
.../runtime/distributed/DistributedHerderTest.java | 13 +-
.../storage/KafkaConfigBackingStoreTest.java | 6 +-
.../storage/KafkaOffsetBackingStoreTest.java | 6 +-
.../kafka/connect/util/SharedTopicAdminTest.java | 112 -------------
.../apache/kafka/connect/util/TopicAdminTest.java | 183 ---------------------
14 files changed, 75 insertions(+), 724 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 839f5dc..fa73f8d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -36,7 +36,6 @@ import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -234,28 +233,20 @@ public class MirrorMaker {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
- // Create the admin client to be shared by all backing stores for this herder
- Map<String, Object> adminProps = new HashMap<>(config.originals());
- ConnectUtils.addMetricsContextProperties(adminProps, distributedConfig, kafkaClusterId);
- SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
- KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
+ KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(distributedConfig);
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
- StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(distributedConfig);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
distributedConfig,
- configTransformer,
- sharedAdmin);
- // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
- // herder is stopped. MirrorMaker has multiple herders, and having the herder own the close responsibility is much easier than
- // tracking the various shared admin objects in this class.
+ configTransformer);
Herder herder = new DistributedHerder(distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
+ advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY);
herders.put(sourceAndTarget, herder);
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8d93e79..22c1ad8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -36,14 +36,12 @@ import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
-import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -103,12 +101,7 @@ public class ConnectDistributed {
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
- // Create the admin client to be shared by all backing stores.
- Map<String, Object> adminProps = new HashMap<>(config.originals());
- ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
- SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
-
- KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
+ KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
@@ -119,20 +112,17 @@ public class ConnectDistributed {
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
- StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
- configTransformer,
- sharedAdmin);
+ configTransformer);
- // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
- // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
+ advertisedUrl.toString(), connectorClientConfigOverridePolicy);
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b6dab25..4879a1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
@@ -68,7 +67,6 @@ import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -141,7 +139,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final Time time;
private final HerderMetrics herderMetrics;
- private final List<AutoCloseable> uponShutdown;
private final String workerGroupId;
private final int workerSyncTimeoutMs;
@@ -189,22 +186,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final DistributedConfig config;
- /**
- * Create a herder that will form a Connect cluster with other {@link DistributedHerder} instances (in this or other JVMs)
- * that have the same group ID.
- *
- * @param config the configuration for the worker; may not be null
- * @param time the clock to use; may not be null
- * @param worker the {@link Worker} instance to use; may not be null
- * @param kafkaClusterId the identifier of the Kafka cluster to use for internal topics; may not be null
- * @param statusBackingStore the backing store for statuses; may not be null
- * @param configBackingStore the backing store for connector configurations; may not be null
- * @param restUrl the URL of this herder's REST API; may not be null
- * @param connectorClientConfigOverridePolicy the policy specifying the client configuration properties that may be overridden
- * in connector configurations; may not be null
- * @param uponShutdown any {@link AutoCloseable} objects that should be closed when this herder is {@link #stop() stopped},
- * after all services and resources owned by this herder are stopped
- */
public DistributedHerder(DistributedConfig config,
Time time,
Worker worker,
@@ -212,10 +193,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
StatusBackingStore statusBackingStore,
ConfigBackingStore configBackingStore,
String restUrl,
- ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
- AutoCloseable... uponShutdown) {
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
this(config, worker, worker.workerId(), kafkaClusterId, statusBackingStore, configBackingStore, null, restUrl, worker.metrics(),
- time, connectorClientConfigOverridePolicy, uponShutdown);
+ time, connectorClientConfigOverridePolicy);
configBackingStore.setUpdateListener(new ConfigUpdateListener());
}
@@ -230,8 +210,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
String restUrl,
ConnectMetrics metrics,
Time time,
- ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
- AutoCloseable... uponShutdown) {
+ ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
this.time = time;
@@ -245,7 +224,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.keySignatureVerificationAlgorithms = config.getList(DistributedConfig.INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
this.keyGenerator = config.getInternalRequestKeyGenerator();
this.isTopicTrackingEnabled = config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
- this.uponShutdown = Arrays.asList(uponShutdown);
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
@@ -700,15 +678,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- protected void stopServices() {
- try {
- super.stopServices();
- } finally {
- this.uponShutdown.forEach(closeable -> Utils.closeQuietly(closeable, closeable != null ? closeable.toString() : "<unknown>"));
- }
- }
-
- @Override
public void stop() {
log.info("Herder stopping");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index d4e6358..fbcc35b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -62,7 +62,6 @@ import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
/**
* <p>
@@ -225,7 +224,6 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// Connector and task configs: name or id -> config map
private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
- private final Supplier<TopicAdmin> topicAdminSupplier;
// Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they have been refreshed.
@@ -243,17 +241,11 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
private final WorkerConfigTransformer configTransformer;
- @Deprecated
public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer) {
- this(converter, config, configTransformer, null);
- }
-
- public KafkaConfigBackingStore(Converter converter, WorkerConfig config, WorkerConfigTransformer configTransformer, Supplier<TopicAdmin> adminSupplier) {
this.lock = new Object();
this.started = false;
this.converter = converter;
this.offset = -1;
- this.topicAdminSupplier = adminSupplier;
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
if (this.topic == null || this.topic.trim().length() == 0)
@@ -479,7 +471,6 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
- Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).configStorageTopicSettings()
: Collections.emptyMap();
@@ -490,25 +481,30 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
.replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();
- return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier);
+ return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
- final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
- java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
- log.debug("Creating admin client to manage Connect internal config topic");
- // Create the topic if it doesn't exist
- Set<String> newTopics = admin.createTopics(topicDescription);
- if (!newTopics.contains(topic)) {
- // It already existed, so check that the topic cleanup policy is compact only and not delete
- log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
- admin.verifyTopicCleanupPolicyOnlyCompact(topic,
- DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
+ final NewTopic topicDescription, final Map<String, Object> adminProps) {
+ Runnable createTopics = new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Creating admin client to manage Connect internal config topic");
+ try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+ // Create the topic if it doesn't exist
+ Set<String> newTopics = admin.createTopics(topicDescription);
+ if (!newTopics.contains(topic)) {
+ // It already existed, so check that the topic cleanup policy is compact only and not delete
+ log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+ admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+ DistributedConfig.CONFIG_TOPIC_CONFIG, "connector configurations");
+ }
+ }
}
};
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}
@SuppressWarnings("unchecked")
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 26b47f9..8408f99 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -41,13 +41,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
/**
* <p>
@@ -64,16 +62,6 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
private KafkaBasedLog<byte[], byte[]> offsetLog;
private HashMap<ByteBuffer, ByteBuffer> data;
- private final Supplier<TopicAdmin> topicAdminSupplier;
-
- @Deprecated
- public KafkaOffsetBackingStore() {
- this.topicAdminSupplier = null;
- }
-
- public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
- this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
- }
@Override
public void configure(final WorkerConfig config) {
@@ -98,7 +86,6 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
- Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).offsetStorageTopicSettings()
: Collections.emptyMap();
@@ -109,25 +96,30 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
.replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG))
.build();
- offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
+ offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
}
private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
- final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
- java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
- log.debug("Creating admin client to manage Connect internal offset topic");
- // Create the topic if it doesn't exist
- Set<String> newTopics = admin.createTopics(topicDescription);
- if (!newTopics.contains(topic)) {
- // It already existed, so check that the topic cleanup policy is compact only and not delete
- log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
- admin.verifyTopicCleanupPolicyOnlyCompact(topic,
- DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
+ final NewTopic topicDescription, final Map<String, Object> adminProps) {
+ Runnable createTopics = new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Creating admin client to manage Connect internal offset topic");
+ try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+ // Create the topic if it doesn't exist
+ Set<String> newTopics = admin.createTopics(topicDescription);
+ if (!newTopics.contains(topic)) {
+ // It already existed, so check that the topic cleanup policy is compact only and not delete
+ log.debug("Using admin client to check cleanup policy for '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+ admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+ DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets");
+ }
+ }
}
};
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, Time.SYSTEM, createTopics);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index abdbba8..5d6057d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -61,7 +61,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.function.Supplier;
/**
* StatusBackingStore implementation which uses a compacted topic for storage
@@ -129,24 +128,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
- private final Supplier<TopicAdmin> topicAdminSupplier;
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
- @Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
- this(time, converter, null);
- }
-
- public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> topicAdminSupplier) {
this.time = time;
this.converter = converter;
this.tasks = new Table<>();
this.connectors = new HashMap<>();
this.topics = new ConcurrentHashMap<>();
- this.topicAdminSupplier = topicAdminSupplier;
}
// visible for testing
@@ -177,7 +169,6 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
- Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).statusStorageTopicSettings()
@@ -195,25 +186,30 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
read(record);
}
};
- this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminSupplier);
+ this.kafkaLog = createKafkaBasedLog(statusTopic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
}
private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<String, byte[]>> consumedCallback,
- final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {
- java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
- log.debug("Creating admin client to manage Connect internal status topic");
- // Create the topic if it doesn't exist
- Set<String> newTopics = admin.createTopics(topicDescription);
- if (!newTopics.contains(topic)) {
- // It already existed, so check that the topic cleanup policy is compact only and not delete
- log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
- admin.verifyTopicCleanupPolicyOnlyCompact(topic,
- DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
+ final NewTopic topicDescription, final Map<String, Object> adminProps) {
+ Runnable createTopics = new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Creating admin client to manage Connect internal status topic");
+ try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+ // Create the topic if it doesn't exist
+ Set<String> newTopics = admin.createTopics(topicDescription);
+ if (!newTopics.contains(topic)) {
+ // It already existed, so check that the topic cleanup policy is compact only and not delete
+ log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+ admin.verifyTopicCleanupPolicyOnlyCompact(topic,
+ DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
+ }
+ }
}
};
- return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, time, createTopics);
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, time, createTopics);
}
@Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 6a2a787..5248715 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -41,12 +41,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
/**
@@ -81,15 +79,13 @@ public class KafkaBasedLog<K, V> {
private final Map<String, Object> producerConfigs;
private final Map<String, Object> consumerConfigs;
private final Callback<ConsumerRecord<K, V>> consumedCallback;
- private final Supplier<TopicAdmin> topicAdminSupplier;
private Consumer<K, V> consumer;
private Producer<K, V> producer;
- private TopicAdmin admin;
private Thread thread;
private boolean stopRequested;
private Queue<Callback<Void>> readLogEndOffsetCallbacks;
- private java.util.function.Consumer<TopicAdmin> initializer;
+ private Runnable initializer;
/**
* Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
@@ -107,63 +103,31 @@ public class KafkaBasedLog<K, V> {
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
* @param initializer the component that should be run when this log is {@link #start() started}; may be null
- * @deprecated Replaced by {@link #KafkaBasedLog(String, Map, Map, Supplier, Callback, Time, java.util.function.Consumer)}
*/
- @Deprecated
public KafkaBasedLog(String topic,
Map<String, Object> producerConfigs,
Map<String, Object> consumerConfigs,
Callback<ConsumerRecord<K, V>> consumedCallback,
Time time,
Runnable initializer) {
- this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null);
- }
-
- /**
- * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
- * {@link #start()} is invoked.
- *
- * @param topic the topic to treat as a log
- * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
- * contain compatible serializer settings for the generic types used on this class. Some
- * setting, such as the number of acks, will be overridden to ensure correct behavior of this
- * class.
- * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
- * contain compatible serializer settings for the generic types used on this class. Some
- * setting, such as the auto offset reset policy, will be overridden to ensure correct
- * behavior of this class.
- * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled
- * by the calling component; may not be null
- * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
- * @param time Time interface
- * @param initializer the function that should be run when this log is {@link #start() started}; may be null
- */
- public KafkaBasedLog(String topic,
- Map<String, Object> producerConfigs,
- Map<String, Object> consumerConfigs,
- Supplier<TopicAdmin> topicAdminSupplier,
- Callback<ConsumerRecord<K, V>> consumedCallback,
- Time time,
- java.util.function.Consumer<TopicAdmin> initializer) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
- this.topicAdminSupplier = Objects.requireNonNull(topicAdminSupplier);
this.consumedCallback = consumedCallback;
this.stopRequested = false;
this.readLogEndOffsetCallbacks = new ArrayDeque<>();
this.time = time;
- this.initializer = initializer != null ? initializer : admin -> { };
+ this.initializer = initializer != null ? initializer : new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
}
public void start() {
log.info("Starting KafkaBasedLog with topic " + topic);
- // Create the topic admin client and initialize the topic ...
- admin = topicAdminSupplier.get(); // may be null
- initializer.accept(admin);
-
- // Then create the producer and consumer
+ initializer.run();
producer = createProducer();
consumer = createConsumer();
@@ -229,9 +193,6 @@ public class KafkaBasedLog<K, V> {
log.error("Failed to stop KafkaBasedLog consumer", e);
}
- // do not close the admin client, since we don't own it
- admin = null;
-
log.info("Stopped KafkaBasedLog for topic " + topic);
}
@@ -321,29 +282,7 @@ public class KafkaBasedLog<K, V> {
log.trace("Reading to end of offset log");
Set<TopicPartition> assignment = consumer.assignment();
- Map<TopicPartition, Long> endOffsets;
- // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions.
- // That is because it's possible that the consumer is already blocked waiting for new records to appear, when
- // the consumer is already at the end. In such cases, using 'consumer.endOffsets(...)' will block until at least
- // one more record becomes available, meaning we can't even check whether we're at the end offset.
- // Since all we're trying to do here is get the end offset, we should use the supplied admin client
- // (if available)
- // (which prevents 'consumer.endOffsets(...)'
- // from
-
- // Deprecated constructors do not provide an admin supplier, so the admin is potentially null.
- if (admin != null) {
- // Use the admin client to immediately find the end offsets for the assigned topic partitions.
- // Unlike using the consumer
- endOffsets = admin.endOffsets(assignment);
- } else {
- // The admin may be null if older deprecated constructor is used, though AK Connect currently always provides an admin client.
- // Using the consumer is not ideal, because when the topic has low volume, the 'poll(...)' method called from the
- // work thread may have blocked the consumer while waiting for more records (even when there are none).
- // In such cases, this call to the consumer to simply find the end offsets will block even though we might already be
- // at the end offset.
- endOffsets = consumer.endOffsets(assignment);
- }
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
log.trace("Reading to end of log offsets {}", endOffsets);
while (!endOffsets.isEmpty()) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
deleted file mode 100644
index a99514e..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SharedTopicAdmin.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.connect.errors.ConnectException;
-
-/**
- * A holder of a {@link TopicAdmin} object that is lazily and atomically created when needed by multiple callers.
- * As soon as one of the getters is called, all getters will return the same shared {@link TopicAdmin}
- * instance until this SharedAdmin is closed via {@link #close()} or {@link #close(Duration)}.
- *
- * <p>The owner of this object is responsible for ensuring that either {@link #close()} or {@link #close(Duration)}
- * is called when the {@link TopicAdmin} instance is no longer needed. Consequently, once this
- * {@link SharedTopicAdmin} instance has been closed, the {@link #get()} and {@link #topicAdmin()} methods,
- * nor any previously returned {@link TopicAdmin} instances may be used.
- *
- * <p>This class is thread-safe. It also appears as immutable to callers that obtain the {@link TopicAdmin} object,
- * until this object is closed, at which point it cannot be used anymore
- */
-public class SharedTopicAdmin implements AutoCloseable, Supplier<TopicAdmin> {
-
- // Visible for testing
- static final Duration DEFAULT_CLOSE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
-
- private final Map<String, Object> adminProps;
- private final AtomicReference<TopicAdmin> admin = new AtomicReference<>();
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private final Function<Map<String, Object>, TopicAdmin> factory;
-
- public SharedTopicAdmin(Map<String, Object> adminProps) {
- this(adminProps, TopicAdmin::new);
- }
-
- // Visible for testing
- SharedTopicAdmin(Map<String, Object> adminProps, Function<Map<String, Object>, TopicAdmin> factory) {
- this.adminProps = Objects.requireNonNull(adminProps);
- this.factory = Objects.requireNonNull(factory);
- }
-
- /**
- * Get the shared {@link TopicAdmin} instance.
- *
- * @return the shared instance; never null
- * @throws ConnectException if this object has already been closed
- */
- @Override
- public TopicAdmin get() {
- return topicAdmin();
- }
-
- /**
- * Get the shared {@link TopicAdmin} instance.
- *
- * @return the shared instance; never null
- * @throws ConnectException if this object has already been closed
- */
- public TopicAdmin topicAdmin() {
- return admin.updateAndGet(this::createAdmin);
- }
-
- /**
- * Get the string containing the list of bootstrap server addresses to the Kafka broker(s) to which
- * the admin client connects.
- *
- * @return the bootstrap servers as a string; never null
- */
- public String bootstrapServers() {
- return adminProps.getOrDefault(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "<unknown>").toString();
- }
-
- /**
- * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created.
- *
- * <p>Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods,
- * nor any previously returned {@link TopicAdmin} instances may be used.
- */
- @Override
- public void close() {
- close(DEFAULT_CLOSE_DURATION);
- }
-
- /**
- * Close the underlying {@link TopicAdmin} instance, if one has been created, and prevent new ones from being created.
- *
- * <p>Once this method is called, the {@link #get()} and {@link #topicAdmin()} methods,
- * nor any previously returned {@link TopicAdmin} instances may be used.
- *
- * @param timeout the maximum time to wait while the underlying admin client is closed; may not be null
- */
- public void close(Duration timeout) {
- Objects.requireNonNull(timeout);
- if (this.closed.compareAndSet(false, true)) {
- TopicAdmin admin = this.admin.getAndSet(null);
- if (admin != null) {
- admin.close(timeout);
- }
- }
- }
-
- @Override
- public String toString() {
- return "admin client for brokers at " + bootstrapServers();
- }
-
- /**
- * Method used to create a {@link TopicAdmin} instance. This method must be side-effect free, since it is called from within
- * the {@link AtomicReference#updateAndGet(UnaryOperator)}.
- *
- * @param existing the existing instance; may be null
- * @return the
- */
- protected TopicAdmin createAdmin(TopicAdmin existing) {
- if (closed.get()) {
- throw new ConnectException("The " + this + " has already been closed and cannot be used.");
- }
- if (existing != null) {
- return existing;
- }
- return factory.apply(adminProps);
- }
-}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 8b7e456..428343b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -23,20 +23,14 @@ import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
@@ -59,7 +53,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -290,14 +283,6 @@ public class TopicAdmin implements AutoCloseable {
}
/**
- * Get the {@link Admin} client used by this topic admin object.
- * @return the Kafka admin instance; never null
- */
- public Admin admin() {
- return admin;
- }
-
- /**
* Attempt to create the topic described by the given definition, returning true if the topic was created or false
* if the topic already existed.
*
@@ -645,58 +630,6 @@ public class TopicAdmin implements AutoCloseable {
return result;
}
- /**
- * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects.
- *
- * @param partitions the topic partitions
- * @return the map of offset for each topic partition, or an empty map if the supplied partitions
- * are null or empty
- * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
- * thread is interrupted while attempting to perform this operation
- * @throws ConnectException if a non retriable error occurs
- */
- public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> partitions) {
- if (partitions == null || partitions.isEmpty()) {
- return Collections.emptyMap();
- }
- Map<TopicPartition, OffsetSpec> offsetSpecMap = partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
- ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap);
- // Get the individual result for each topic partition so we have better error messages
- Map<TopicPartition, Long> result = new HashMap<>();
- for (TopicPartition partition : partitions) {
- try {
- ListOffsetsResultInfo info = resultFuture.partitionResult(partition).get();
- result.put(partition, info.offset());
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- String topic = partition.topic();
- if (cause instanceof AuthorizationException) {
- String msg = String.format("Not authorized to get the end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
- throw new ConnectException(msg, e);
- } else if (cause instanceof UnsupportedVersionException) {
- // Should theoretically never happen, because this method is the same as what the consumer uses and therefore
- // should exist in the broker since before the admin client was added
- String msg = String.format("API to get the get the end offsets for topic '%s' is unsupported on brokers at %s", topic, bootstrapServers());
- throw new ConnectException(msg, e);
- } else if (cause instanceof TimeoutException) {
- String msg = String.format("Timed out while waiting to get end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
- throw new RetriableException(msg, e);
- } else if (cause instanceof LeaderNotAvailableException) {
- String msg = String.format("Unable to get end offsets during leader election for topic '%s' on brokers at %s", topic, bootstrapServers());
- throw new RetriableException(msg, e);
- } else {
- String msg = String.format("Error while getting end offsets for topic '%s' on brokers at %s", topic, bootstrapServers());
- throw new ConnectException(msg, e);
- }
- } catch (InterruptedException e) {
- Thread.interrupted();
- String msg = String.format("Interrupted while attempting to read end offsets for topic '%s' on brokers at %s", partition.topic(), bootstrapServers());
- throw new RetriableException(msg, e);
- }
- }
- return result;
- }
-
@Override
public void close() {
admin.close();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index bed46a3..bea062e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -77,7 +77,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -192,7 +191,6 @@ public class DistributedHerderTest {
@Mock private Plugins plugins;
@Mock private PluginClassLoader pluginLoader;
@Mock private DelegatingClassLoader delegatingLoader;
- private CountDownLatch shutdownCalled = new CountDownLatch(1);
private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener;
@@ -210,7 +208,6 @@ public class DistributedHerderTest {
metrics = new MockConnectMetrics(time);
worker = PowerMock.createMock(Worker.class);
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
- AutoCloseable uponShutdown = () -> shutdownCalled.countDown();
// Default to the old protocol unless specified otherwise
connectProtocolVersion = CONNECT_PROTOCOL_V0;
@@ -218,8 +215,7 @@ public class DistributedHerderTest {
herder = PowerMock.createPartialMock(DistributedHerder.class,
new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus", "validateConnectorConfig"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
- statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy,
- new AutoCloseable[]{uponShutdown});
+ statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener(time);
@@ -2309,13 +2305,6 @@ public class DistributedHerderTest {
getThreadFactory().newThread(EMPTY_RUNNABLE).getName().startsWith("StartAndStopExecutor"));
}
- @Test
- public void testHerderStopServicesClosesUponShutdown() {
- assertEquals(1, shutdownCalled.getCount());
- herder.stopServices();
- assertEquals(0, shutdownCalled.getCount());
- }
-
private void expectRebalance(final long offset,
final List<String> assignedConnectors,
final List<ConnectorTaskId> assignedTasks) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 7d301ca..a46f6fe 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
-import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -58,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -144,7 +142,7 @@ public class KafkaConfigBackingStoreTest {
private Capture<String> capturedTopic = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
- private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+ private Capture<Map<String, Object>> capturedAdminProps = EasyMock.newCapture();
private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
@@ -897,7 +895,7 @@ public class KafkaConfigBackingStoreTest {
PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
- EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier))
+ EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps))
.andReturn(storeLog);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index ebcf208..0d0194f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.KafkaBasedLog;
-import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -51,7 +50,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -108,7 +106,7 @@ public class KafkaOffsetBackingStoreTest {
private Capture<String> capturedTopic = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
- private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
+ private Capture<Map<String, Object>> capturedAdminProps = EasyMock.newCapture();
private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
@@ -392,7 +390,7 @@ public class KafkaOffsetBackingStoreTest {
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
- EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier))
+ EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminProps))
.andReturn(storeLog);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
deleted file mode 100644
index f5ac6a7..0000000
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/SharedTopicAdminTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
-import java.util.function.Function;
-
-import org.apache.kafka.connect.errors.ConnectException;
-import org.junit.Rule;
-import org.mockito.Mock;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
-
-import static org.apache.kafka.connect.util.SharedTopicAdmin.DEFAULT_CLOSE_DURATION;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThrows;
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class SharedTopicAdminTest {
-
- private static final Map<String, Object> EMPTY_CONFIG = Collections.emptyMap();
-
- @Rule
- public MockitoRule rule = MockitoJUnit.rule();
-
- @Mock private TopicAdmin mockTopicAdmin;
- @Mock private Function<Map<String, Object>, TopicAdmin> factory;
- private SharedTopicAdmin sharedAdmin;
-
- @Before
- public void beforeEach() {
- when(factory.apply(anyMap())).thenReturn(mockTopicAdmin);
- sharedAdmin = new SharedTopicAdmin(EMPTY_CONFIG, factory::apply);
- }
-
- @Test
- public void shouldCloseWithoutBeingUsed() {
- // When closed before being used
- sharedAdmin.close();
- // Then should not create or close admin
- verifyTopicAdminCreatesAndCloses(0);
- }
-
- @Test
- public void shouldCloseAfterTopicAdminUsed() {
- // When used and then closed
- assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
- sharedAdmin.close();
- // Then should have created and closed just one admin
- verifyTopicAdminCreatesAndCloses(1);
- }
-
- @Test
- public void shouldCloseAfterTopicAdminUsedMultipleTimes() {
- // When used many times and then closed
- for (int i = 0; i != 10; ++i) {
- assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
- }
- sharedAdmin.close();
- // Then should have created and closed just one admin
- verifyTopicAdminCreatesAndCloses(1);
- }
-
- @Test
- public void shouldCloseWithDurationAfterTopicAdminUsed() {
- // When used and then closed with a custom timeout
- Duration timeout = Duration.ofSeconds(1);
- assertSame(mockTopicAdmin, sharedAdmin.topicAdmin());
- sharedAdmin.close(timeout);
- // Then should have created and closed just one admin using the supplied timeout
- verifyTopicAdminCreatesAndCloses(1, timeout);
- }
-
- @Test
- public void shouldFailToGetTopicAdminAfterClose() {
- // When closed
- sharedAdmin.close();
- // Then using the admin should fail
- assertThrows(ConnectException.class, () -> sharedAdmin.topicAdmin());
- }
-
- private void verifyTopicAdminCreatesAndCloses(int count) {
- verifyTopicAdminCreatesAndCloses(count, DEFAULT_CLOSE_DURATION);
- }
-
- private void verifyTopicAdminCreatesAndCloses(int count, Duration expectedDuration) {
- verify(factory, times(count)).apply(anyMap());
- verify(mockTopicAdmin, times(count)).close(eq(expectedDuration));
- }
-}
\ No newline at end of file
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index dbe2910..e4d069d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,12 +17,9 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
@@ -30,14 +27,11 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.CreateTopicsResponseData;
@@ -53,13 +47,10 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Test;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -71,11 +62,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class TopicAdminTest {
@@ -474,175 +460,6 @@ public class TopicAdminTest {
}
}
- @SuppressWarnings("unchecked")
- private <T> KafkaFuture<T> mockFuture() {
- return (KafkaFuture<T>) mock(KafkaFuture.class);
- }
-
- private Admin expectAdminListOffsetsFailure(Throwable expected) throws Exception {
- // When the admin client lists offsets
- Admin mockAdmin = mock(Admin.class);
- ListOffsetsResult results = mock(ListOffsetsResult.class);
- when(mockAdmin.listOffsets(anyMap())).thenReturn(results);
- // and throws an exception via the future.get()
- ExecutionException execException = new ExecutionException(expected);
- KafkaFuture<ListOffsetsResultInfo> future = mockFuture();
- when(future.get()).thenThrow(execException);
- when(results.partitionResult(any(TopicPartition.class))).thenReturn(future);
- return mockAdmin;
- }
-
- private void expectOffsets(ListOffsetsResult results, TopicPartition tp, long offset) throws Exception {
- // Then return the offsets for tp2
- ListOffsetsResultInfo resultsInfo = new ListOffsetsResultInfo(offset, 0L, Optional.empty());
- KafkaFuture<ListOffsetsResultInfo> future = mockFuture();
- when(future.get()).thenReturn(resultsInfo);
- when(results.partitionResult(eq(tp))).thenReturn(future);
-
- }
-
- @Test
- public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() throws Exception {
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // When the admin client lists offsets throws an exception
- Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed"));
-
- // Then the topic admin should throw exception
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- ConnectException e = assertThrows(ConnectException.class, () -> {
- admin.endOffsets(tps);
- });
- assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
- }
-
- @Test
- public void endOffsetsShouldFailWithNonRetriableWhenVersionUnsupportedErrorOccurs() throws Exception {
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // When the admin client lists offsets throws an exception
- Admin mockAdmin = expectAdminListOffsetsFailure(new UnsupportedVersionException("failed"));
-
- // Then the topic admin should throw exception
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- ConnectException e = assertThrows(ConnectException.class, () -> {
- admin.endOffsets(tps);
- });
- assertTrue(e.getMessage().contains("is unsupported on brokers"));
- }
-
- @Test
- public void endOffsetsShouldFailWithRetriableWhenTimeoutErrorOccurs() throws Exception {
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // When the admin client lists offsets throws an exception
- Admin mockAdmin = expectAdminListOffsetsFailure(new TimeoutException("failed"));
-
- // Then the topic admin should throw exception
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- ConnectException e = assertThrows(ConnectException.class, () -> {
- admin.endOffsets(tps);
- });
- assertTrue(e.getMessage().contains("Timed out while waiting"));
- }
-
- @Test
- public void endOffsetsShouldFailWithNonRetriableWhenUnknownErrorOccurs() throws Exception {
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // When the admin client lists offsets throws an exception
- Admin mockAdmin = expectAdminListOffsetsFailure(new RuntimeException("failed"));
-
- // Then the topic admin should throw exception
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- ConnectException e = assertThrows(ConnectException.class, () -> {
- admin.endOffsets(tps);
- });
- assertTrue(e.getMessage().contains("Error while getting end offsets for topic"));
- }
-
- @Test
- public void endOffsetsShouldReturnEmptyMapWhenPartitionsSetIsNull() throws Exception {
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // Then the topic admin should return immediately
- Admin mockAdmin = mock(Admin.class);
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- Map<TopicPartition, Long> offsets = admin.endOffsets(Collections.emptySet());
- assertTrue(offsets.isEmpty());
- }
-
- @Test
- public void endOffsetsShouldReturnOffsetsForOnePartition() throws Exception {
- String topicName = "myTopic";
- long offset = 1L;
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // When the admin client lists offsets is called with one topic partition
- Admin mockAdmin = mock(Admin.class);
- ListOffsetsResult results = mock(ListOffsetsResult.class);
- when(mockAdmin.listOffsets(anyMap())).thenReturn(results);
- expectOffsets(results, tp1, offset);
-
- // Then the topic admin should return offsets
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
- assertEquals(1, offsets.size());
- assertEquals(Long.valueOf(offset), offsets.get(tp1));
- }
-
- @Test
- public void endOffsetsShouldReturnOffsetsForMultiplePartitions() throws Exception {
- long offset1 = 1L;
- long offset2 = 2L;
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- TopicPartition tp2 = new TopicPartition(topicName, 1);
- Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2));
-
- // When the admin client lists offsets is called with one topic partition
- Admin mockAdmin = mock(Admin.class);
- ListOffsetsResult results = mock(ListOffsetsResult.class);
- when(mockAdmin.listOffsets(anyMap())).thenReturn(results);
- expectOffsets(results, tp1, offset1);
- expectOffsets(results, tp2, offset2);
-
- // Then the topic admin should return offsets
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- Map<TopicPartition, Long> offsets = admin.endOffsets(tps);
- assertEquals(2, offsets.size());
- assertEquals(Long.valueOf(offset1), offsets.get(tp1));
- assertEquals(Long.valueOf(offset2), offsets.get(tp2));
- }
-
- @Test
- public void endOffsetsShouldFailWhenAnyTopicPartitionHasError() throws Exception {
- String topicName = "myTopic";
- TopicPartition tp1 = new TopicPartition(topicName, 0);
- Set<TopicPartition> tps = Collections.singleton(tp1);
-
- // When the admin client lists offsets throws an exception
- Admin mockAdmin = expectAdminListOffsetsFailure(new AuthorizationException("failed"));
-
- // Then the topic admin should throw exception
- TopicAdmin admin = new TopicAdmin(null, mockAdmin);
- ConnectException e = assertThrows(ConnectException.class, () -> {
- admin.endOffsets(tps);
- });
- assertTrue(e.getMessage().contains("Not authorized to get the end offsets"));
- }
-
private Cluster createCluster(int numNodes) {
HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; ++i) {