You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/10 06:50:12 UTC
[kafka] branch trunk updated: MINOR: Small cleanups in connect/mirror (#12113)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 989d3ce07f MINOR: Small cleanups in connect/mirror (#12113)
989d3ce07f is described below
commit 989d3ce07f1848d4c0b9fbb116ff0cf9b3b382d7
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue May 10 08:49:56 2022 +0200
MINOR: Small cleanups in connect/mirror (#12113)
Reviewers: Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@gmail.com>
---
.../kafka/connect/mirror/MirrorCheckpointTask.java | 2 +-
.../apache/kafka/connect/mirror/OffsetSync.java | 6 +++---
.../kafka/connect/mirror/OffsetSyncStore.java | 6 +++---
.../org/apache/kafka/connect/mirror/Scheduler.java | 22 +++++++++++-----------
.../MirrorConnectorsIntegrationBaseTest.java | 2 +-
5 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 47631998fb..30fb695d92 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -105,7 +105,7 @@ public class MirrorCheckpointTask extends SourceTask {
}
@Override
- public void commit() throws InterruptedException {
+ public void commit() {
// nop
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
index 68e6441f18..e1ecb1e1db 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -39,9 +39,9 @@ public class OffsetSync {
new Field(TOPIC_KEY, Type.STRING),
new Field(PARTITION_KEY, Type.INT32));
- private TopicPartition topicPartition;
- private long upstreamOffset;
- private long downstreamOffset;
+ private final TopicPartition topicPartition;
+ private final long upstreamOffset;
+ private final long downstreamOffset;
public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
this.topicPartition = topicPartition;
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 600dda46f3..9152cd5aa0 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -30,9 +30,9 @@ import java.time.Duration;
/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
- private KafkaConsumer<byte[], byte[]> consumer;
- private Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
- private TopicPartition offsetSyncTopicPartition;
+ private final KafkaConsumer<byte[], byte[]> consumer;
+ private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
+ private final TopicPartition offsetSyncTopicPartition;
OffsetSyncStore(MirrorConnectorConfig config) {
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 20f2ca7e2c..0644d6a6c6 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Scheduler implements AutoCloseable {
- private static Logger log = LoggerFactory.getLogger(Scheduler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
private final String name;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@@ -62,11 +62,11 @@ class Scheduler implements AutoCloseable {
try {
executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- log.warn("{} was interrupted running task: {}", name, description);
+ LOG.warn("{} was interrupted running task: {}", name, description);
} catch (TimeoutException e) {
- log.error("{} timed out running task: {}", name, description);
+ LOG.error("{} timed out running task: {}", name, description);
} catch (Throwable e) {
- log.error("{} caught exception in task: {}", name, description, e);
+ LOG.error("{} caught exception in task: {}", name, description, e);
}
}
@@ -76,10 +76,10 @@ class Scheduler implements AutoCloseable {
try {
boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!terminated) {
- log.error("{} timed out during shutdown of internal scheduler.", name);
+ LOG.error("{} timed out during shutdown of internal scheduler.", name);
}
} catch (InterruptedException e) {
- log.warn("{} was interrupted during shutdown of internal scheduler.", name);
+ LOG.warn("{} was interrupted during shutdown of internal scheduler.", name);
}
}
@@ -92,21 +92,21 @@ class Scheduler implements AutoCloseable {
long start = System.currentTimeMillis();
task.run();
long elapsed = System.currentTimeMillis() - start;
- log.info("{} took {} ms", description, elapsed);
+ LOG.info("{} took {} ms", description, elapsed);
if (elapsed > timeout.toMillis()) {
- log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
+ LOG.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
}
} catch (InterruptedException e) {
- log.warn("{} was interrupted running task: {}", name, description);
+ LOG.warn("{} was interrupted running task: {}", name, description);
} catch (Throwable e) {
- log.error("{} caught exception in scheduled task: {}", name, description, e);
+ LOG.error("{} caught exception in scheduled task: {}", name, description, e);
}
}
private void executeThread(Task task, String description) {
Thread.currentThread().setName(name + "-" + description);
if (closed) {
- log.info("{} skipping task due to shutdown: {}", name, description);
+ LOG.info("{} skipping task due to shutdown: {}", name, description);
return;
}
run(task, description);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 8f692ca911..f325e15695 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -729,7 +729,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
*/
- protected void warmUpConsumer(Map<String, Object> consumerProps) throws InterruptedException {
+ protected void warmUpConsumer(Map<String, Object> consumerProps) {
Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();