You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/03/26 11:59:47 UTC
[kafka] branch trunk updated: MINOR: Fix a number of warnings in
mirror/mirror-client (#8074)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3df5464 MINOR: Fix a number of warnings in mirror/mirror-client (#8074)
3df5464 is described below
commit 3df5464fca06d36b806e50c9d6db047d9d612651
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Thu Mar 26 11:59:21 2020 +0000
MINOR: Fix a number of warnings in mirror/mirror-client (#8074)
Reviewers: Ismael Juma <is...@juma.me.uk>, Ryanne Dolan <ry...@gmail.com>, Andrew Choi <a2...@edu.uwaterloo.ca>
---
.../java/org/apache/kafka/connect/mirror/MirrorClientConfig.java | 2 +-
.../java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java | 5 +----
.../org/apache/kafka/connect/mirror/MirrorCheckpointTask.java | 2 --
.../org/apache/kafka/connect/mirror/MirrorConnectorConfig.java | 8 ++++----
.../main/java/org/apache/kafka/connect/mirror/MirrorMaker.java | 4 ++--
.../java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java | 3 +--
.../main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java | 2 --
.../src/main/java/org/apache/kafka/connect/mirror/Scheduler.java | 2 +-
8 files changed, 10 insertions(+), 18 deletions(-)
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 0c163d8..9292198 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -47,7 +47,7 @@ import java.util.HashMap;
public class MirrorClientConfig extends AbstractConfig {
public static final String REPLICATION_POLICY_CLASS = "replication.policy.class";
private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
- public static final Class REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
+ public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator";
private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
index f934319..49da62d 100644
--- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -25,9 +25,7 @@ import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+
/** Convenience methods for multi-cluster environments. Wraps MirrorClient (@see MirrorClient).
* <p>
* Properties passed to these methods are used to construct internal Admin and Consumer clients.
@@ -42,7 +40,6 @@ import org.slf4j.LoggerFactory;
* </p>
*/
public final class RemoteClusterUtils {
- private static final Logger log = LoggerFactory.getLogger(RemoteClusterUtils.class);
// utility class
private RemoteClusterUtils() {}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index ac2c7ad..ab55cda 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -47,7 +47,6 @@ public class MirrorCheckpointTask extends SourceTask {
private String checkpointsTopic;
private Duration interval;
private Duration pollTimeout;
- private Duration adminTimeout;
private TopicFilter topicFilter;
private Set<String> consumerGroups;
private ReplicationPolicy replicationPolicy;
@@ -78,7 +77,6 @@ public class MirrorCheckpointTask extends SourceTask {
replicationPolicy = config.replicationPolicy();
interval = config.emitCheckpointsInterval();
pollTimeout = config.consumerPollTimeout();
- adminTimeout = config.adminTimeout();
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
metrics = config.metrics();
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 527c1eb..72dd435 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -76,7 +76,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target";
private static final String TARGET_CLUSTER_ALIAS_DOC = "Alias of target cluster. Used in metrics reporting.";
public static final String REPLICATION_POLICY_CLASS = MirrorClientConfig.REPLICATION_POLICY_CLASS;
- public static final Class REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT;
+ public static final Class<?> REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT;
private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
public static final String REPLICATION_POLICY_SEPARATOR = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
@@ -171,14 +171,14 @@ public class MirrorConnectorConfig extends AbstractConfig {
public static final String TOPIC_FILTER_CLASS = "topic.filter.class";
private static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
- public static final Class TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
+ public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
public static final String GROUP_FILTER_CLASS = "group.filter.class";
private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate.";
- public static final Class GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
+ public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
public static final String CONFIG_PROPERTY_FILTER_CLASS = "config.property.filter.class";
private static final String CONFIG_PROPERTY_FILTER_CLASS_DOC = "ConfigPropertyFilter to use. Selects topic config "
+ " properties to replicate.";
- public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class;
+ public static final Class<?> CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class;
public static final String OFFSET_LAG_MAX = "offset.lag.max";
private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 497e691..fa73f8d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -92,7 +92,7 @@ public class MirrorMaker {
private static final ConnectorClientConfigOverridePolicy CLIENT_CONFIG_OVERRIDE_POLICY =
new AllConnectorClientConfigOverridePolicy();
- private static final List<Class> CONNECTOR_CLASSES = Arrays.asList(
+ private static final List<Class<?>> CONNECTOR_CLASSES = Arrays.asList(
MirrorSourceConnector.class,
MirrorHeartbeatConnector.class,
MirrorCheckpointConnector.class);
@@ -200,7 +200,7 @@ public class MirrorMaker {
}
}
- private void configureConnector(SourceAndTarget sourceAndTarget, Class connectorClass) {
+ private void configureConnector(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
checkHerder(sourceAndTarget);
Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass);
herders.get(sourceAndTarget)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index df5d38f..059ab78 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -69,7 +69,6 @@ public class MirrorMakerConfig extends AbstractConfig {
private static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
private static final String BYTE_ARRAY_CONVERTER_CLASS =
"org.apache.kafka.connect.converters.ByteArrayConverter";
- private static final String REPLICATION_FACTOR = "replication.factor";
static final String SOURCE_CLUSTER_PREFIX = "source.cluster.";
static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
@@ -175,7 +174,7 @@ public class MirrorMakerConfig extends AbstractConfig {
}
// loads properties of the form cluster.x.y.z and source->target.x.y.z
- Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class connectorClass) {
+ Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class<?> connectorClass) {
Map<String, String> props = new HashMap<>();
props.putAll(originalsStrings());
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
index ea9d2f7..04a92b9 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
@@ -100,12 +100,10 @@ class MirrorMetrics implements AutoCloseable {
private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
private final String source;
private final String target;
- private final Set<String> groups;
MirrorMetrics(MirrorTaskConfig taskConfig) {
this.target = taskConfig.targetClusterAlias();
this.source = taskConfig.sourceClusterAlias();
- this.groups = taskConfig.taskConsumerGroups();
this.metrics = new Metrics();
// for side-effect
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
index 203c01d..20f2ca7 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -39,7 +39,7 @@ class Scheduler implements AutoCloseable {
this.timeout = timeout;
}
- Scheduler(Class clazz, Duration timeout) {
+ Scheduler(Class<?> clazz, Duration timeout) {
this("Scheduler for " + clazz.getSimpleName(), timeout);
}