You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/03/26 11:59:47 UTC

[kafka] branch trunk updated: MINOR: Fix a number of warnings in mirror/mirror-client (#8074)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3df5464  MINOR: Fix a number of warnings in mirror/mirror-client (#8074)
3df5464 is described below

commit 3df5464fca06d36b806e50c9d6db047d9d612651
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Thu Mar 26 11:59:21 2020 +0000

    MINOR: Fix a number of warnings in mirror/mirror-client (#8074)
    
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Ryanne Dolan <ry...@gmail.com>, Andrew Choi <a2...@edu.uwaterloo.ca>
---
 .../java/org/apache/kafka/connect/mirror/MirrorClientConfig.java  | 2 +-
 .../java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java  | 5 +----
 .../org/apache/kafka/connect/mirror/MirrorCheckpointTask.java     | 2 --
 .../org/apache/kafka/connect/mirror/MirrorConnectorConfig.java    | 8 ++++----
 .../main/java/org/apache/kafka/connect/mirror/MirrorMaker.java    | 4 ++--
 .../java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java   | 3 +--
 .../main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java  | 2 --
 .../src/main/java/org/apache/kafka/connect/mirror/Scheduler.java  | 2 +-
 8 files changed, 10 insertions(+), 18 deletions(-)

diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 0c163d8..9292198 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -47,7 +47,7 @@ import java.util.HashMap;
 public class MirrorClientConfig extends AbstractConfig {
     public static final String REPLICATION_POLICY_CLASS = "replication.policy.class";
     private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
-    public static final Class REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
+    public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
     public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator";
     private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
     public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
index f934319..49da62d 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -25,9 +25,7 @@ import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.time.Duration;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- 
+
 /** Convenience methods for multi-cluster environments. Wraps MirrorClient (@see MirrorClient).
  *  <p>
  *  Properties passed to these methods are used to construct internal Admin and Consumer clients.
@@ -42,7 +40,6 @@ import org.slf4j.LoggerFactory;
  *  </p>
  */
 public final class RemoteClusterUtils {
-    private static final Logger log = LoggerFactory.getLogger(RemoteClusterUtils.class);
 
     // utility class
     private RemoteClusterUtils() {}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index ac2c7ad..ab55cda 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -47,7 +47,6 @@ public class MirrorCheckpointTask extends SourceTask {
     private String checkpointsTopic;
     private Duration interval;
     private Duration pollTimeout;
-    private Duration adminTimeout;
     private TopicFilter topicFilter;
     private Set<String> consumerGroups;
     private ReplicationPolicy replicationPolicy;
@@ -78,7 +77,6 @@ public class MirrorCheckpointTask extends SourceTask {
         replicationPolicy = config.replicationPolicy();
         interval = config.emitCheckpointsInterval();
         pollTimeout = config.consumerPollTimeout();
-        adminTimeout = config.adminTimeout();
         offsetSyncStore = new OffsetSyncStore(config);
         sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
         metrics = config.metrics();
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 527c1eb..72dd435 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -76,7 +76,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
     public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target";
     private static final String TARGET_CLUSTER_ALIAS_DOC = "Alias of target cluster. Used in metrics reporting.";
     public static final String REPLICATION_POLICY_CLASS = MirrorClientConfig.REPLICATION_POLICY_CLASS;
-    public static final Class REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT;
+    public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT;
     private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
     public static final String REPLICATION_POLICY_SEPARATOR = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
     private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
@@ -171,14 +171,14 @@ public class MirrorConnectorConfig extends AbstractConfig {
 
     public static final String TOPIC_FILTER_CLASS = "topic.filter.class";
     private static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
-    public static final Class TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
+    public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
     public static final String GROUP_FILTER_CLASS = "group.filter.class";
     private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate.";
-    public static final Class GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
+    public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
     public static final String CONFIG_PROPERTY_FILTER_CLASS = "config.property.filter.class";
     private static final String CONFIG_PROPERTY_FILTER_CLASS_DOC = "ConfigPropertyFilter to use. Selects topic config "
             + " properties to replicate.";
-    public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class;
+    public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class;
 
     public static final String OFFSET_LAG_MAX = "offset.lag.max";
     private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 497e691..fa73f8d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -92,7 +92,7 @@ public class MirrorMaker {
     private static final ConnectorClientConfigOverridePolicy CLIENT_CONFIG_OVERRIDE_POLICY =
             new AllConnectorClientConfigOverridePolicy();
 
-    private static final List<Class> CONNECTOR_CLASSES = Arrays.asList(
+    private static final List<Class<?>> CONNECTOR_CLASSES = Arrays.asList(
         MirrorSourceConnector.class,
         MirrorHeartbeatConnector.class,
         MirrorCheckpointConnector.class);
@@ -200,7 +200,7 @@ public class MirrorMaker {
         }
     }
 
-    private void configureConnector(SourceAndTarget sourceAndTarget, Class connectorClass) {
+    private void configureConnector(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
         checkHerder(sourceAndTarget);
         Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass);
         herders.get(sourceAndTarget)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index df5d38f..059ab78 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -69,7 +69,6 @@ public class MirrorMakerConfig extends AbstractConfig {
     private static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
     private static final String BYTE_ARRAY_CONVERTER_CLASS =
         "org.apache.kafka.connect.converters.ByteArrayConverter";
-    private static final String REPLICATION_FACTOR = "replication.factor";
 
     static final String SOURCE_CLUSTER_PREFIX = "source.cluster.";
     static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
@@ -175,7 +174,7 @@ public class MirrorMakerConfig extends AbstractConfig {
     }
 
     // loads properties of the form cluster.x.y.z and source->target.x.y.z
-    Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class connectorClass) {
+    Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
         Map<String, String> props = new HashMap<>();
 
         props.putAll(originalsStrings());
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
index ea9d2f7..04a92b9 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
@@ -100,12 +100,10 @@ class MirrorMetrics implements AutoCloseable {
     private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
     private final String source;
     private final String target;
-    private final Set<String> groups;
 
     MirrorMetrics(MirrorTaskConfig taskConfig) {
         this.target = taskConfig.targetClusterAlias();
         this.source = taskConfig.sourceClusterAlias();
-        this.groups = taskConfig.taskConsumerGroups();
         this.metrics = new Metrics();
 
         // for side-effect
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 203c01d..20f2ca7 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -39,7 +39,7 @@ class Scheduler implements AutoCloseable {
         this.timeout = timeout;
     }
 
-    Scheduler(Class clazz, Duration timeout) {
+    Scheduler(Class<?> clazz, Duration timeout) {
         this("Scheduler for " + clazz.getSimpleName(), timeout);
     }