You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/10 06:50:12 UTC

[kafka] branch trunk updated: MINOR: Small cleanups in connect/mirror (#12113)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 989d3ce07f MINOR: Small cleanups in connect/mirror (#12113)
989d3ce07f is described below

commit 989d3ce07f1848d4c0b9fbb116ff0cf9b3b382d7
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Tue May 10 08:49:56 2022 +0200

    MINOR: Small cleanups in connect/mirror (#12113)
    
    Reviewers: Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@gmail.com>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  2 +-
 .../apache/kafka/connect/mirror/OffsetSync.java    |  6 +++---
 .../kafka/connect/mirror/OffsetSyncStore.java      |  6 +++---
 .../org/apache/kafka/connect/mirror/Scheduler.java | 22 +++++++++++-----------
 .../MirrorConnectorsIntegrationBaseTest.java       |  2 +-
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 47631998fb..30fb695d92 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -105,7 +105,7 @@ public class MirrorCheckpointTask extends SourceTask {
     }
 
     @Override
-    public void commit() throws InterruptedException {
+    public void commit() {
         // nop
     }
 
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
index 68e6441f18..e1ecb1e1db 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -39,9 +39,9 @@ public class OffsetSync {
             new Field(TOPIC_KEY, Type.STRING),
             new Field(PARTITION_KEY, Type.INT32));
 
-    private TopicPartition topicPartition;
-    private long upstreamOffset;
-    private long downstreamOffset;
+    private final TopicPartition topicPartition;
+    private final long upstreamOffset;
+    private final long downstreamOffset;
 
     public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
         this.topicPartition = topicPartition;
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 600dda46f3..9152cd5aa0 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -30,9 +30,9 @@ import java.time.Duration;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private KafkaConsumer<byte[], byte[]> consumer;
-    private Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private TopicPartition offsetSyncTopicPartition;
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
+    private final TopicPartition offsetSyncTopicPartition;
 
     OffsetSyncStore(MirrorConnectorConfig config) {
         consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 20f2ca7e2c..0644d6a6c6 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class Scheduler implements AutoCloseable {
-    private static Logger log = LoggerFactory.getLogger(Scheduler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
 
     private final String name;
     private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
@@ -62,11 +62,11 @@ class Scheduler implements AutoCloseable {
         try {
             executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-            log.warn("{} was interrupted running task: {}", name, description);
+            LOG.warn("{} was interrupted running task: {}", name, description);
         } catch (TimeoutException e) {
-            log.error("{} timed out running task: {}", name, description);
+            LOG.error("{} timed out running task: {}", name, description);
         } catch (Throwable e) {
-            log.error("{} caught exception in task: {}", name, description, e);
+            LOG.error("{} caught exception in task: {}", name, description, e);
         }
     } 
 
@@ -76,10 +76,10 @@ class Scheduler implements AutoCloseable {
         try {
             boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
             if (!terminated) {
-                log.error("{} timed out during shutdown of internal scheduler.", name);
+                LOG.error("{} timed out during shutdown of internal scheduler.", name);
             }
         } catch (InterruptedException e) {
-            log.warn("{} was interrupted during shutdown of internal scheduler.", name);
+            LOG.warn("{} was interrupted during shutdown of internal scheduler.", name);
         }
     }
 
@@ -92,21 +92,21 @@ class Scheduler implements AutoCloseable {
             long start = System.currentTimeMillis();
             task.run();
             long elapsed = System.currentTimeMillis() - start;
-            log.info("{} took {} ms", description, elapsed);
+            LOG.info("{} took {} ms", description, elapsed);
             if (elapsed > timeout.toMillis()) {
-                log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
+                LOG.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
             }
         } catch (InterruptedException e) {
-            log.warn("{} was interrupted running task: {}", name, description);
+            LOG.warn("{} was interrupted running task: {}", name, description);
         } catch (Throwable e) {
-            log.error("{} caught exception in scheduled task: {}", name, description, e);
+            LOG.error("{} caught exception in scheduled task: {}", name, description, e);
         }
     }
 
     private void executeThread(Task task, String description) {
         Thread.currentThread().setName(name + "-" + description);
         if (closed) {
-            log.info("{} skipping task due to shutdown: {}", name, description);
+            LOG.info("{} skipping task due to shutdown: {}", name, description);
             return;
         }
         run(task, description);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 8f692ca911..f325e15695 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -729,7 +729,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
     /*
      * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
      */
-    protected void warmUpConsumer(Map<String, Object> consumerProps) throws InterruptedException {
+    protected void warmUpConsumer(Map<String, Object> consumerProps) {
         Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
         dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
         dummyConsumer.commitSync();