You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/14 00:39:30 UTC

[kafka] branch trunk updated: MINOR: Misc improvements on runtime / storage / metrics / config parts (#4525)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2693e9b  MINOR: Misc improvements on runtime / storage / metrics / config parts (#4525)
2693e9b is described below

commit 2693e9be7412ec03173c8942e9ccfcc24cfbbce1
Author: Benedict Jin <15...@qq.com>
AuthorDate: Wed Feb 14 08:39:21 2018 +0800

    MINOR: Misc improvements on runtime / storage / metrics / config parts (#4525)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../org/apache/kafka/common/config/ConfigDef.java  |  4 ++-
 .../org/apache/kafka/common/metrics/Metrics.java   |  2 +-
 .../kafka/connect/cli/ConnectDistributed.java      |  6 ++--
 .../connect/runtime/WorkerSinkTaskContext.java     |  3 +-
 .../connect/storage/KafkaConfigBackingStore.java   | 10 +++----
 .../connect/storage/KafkaOffsetBackingStore.java   |  9 +++---
 .../connect/storage/KafkaStatusBackingStore.java   | 32 +++++++++++-----------
 7 files changed, 35 insertions(+), 31 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index bb199dd..bbe69a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -877,7 +877,9 @@ public class ConfigDef {
         }
 
         public String toString() {
-            if (min == null)
+            if (min == null && max == null)
+                return "[...]";
+            else if (min == null)
                 return "[...," + max + "]";
             else if (max == null)
                 return "[" + min + ",...]";
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 8868ee7..ea18cd3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -461,7 +461,7 @@ public class Metrics implements Closeable {
      * This is a way to expose existing values as metrics.
      *
      * This method is kept for binary compatibility purposes, it has the same behaviour as
-     * {@link #addMetric(MetricName, MetricValue)}.
+     * {@link #addMetric(MetricName, MetricValueProvider)}.
      *
      * @param metricName The name of the metric
      * @param measurable The measurable that will be measured by this metric
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 98a77ed..4afa47d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@@ -83,10 +84,11 @@ public class ConnectDistributed {
 
         Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
 
-        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
+        Converter internalValueConverter = worker.getInternalValueConverter();
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
         statusBackingStore.configure(config);
 
-        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
+        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config);
 
         DistributedHerder herder = new DistributedHerder(config, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 8076403..0878949 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -104,8 +104,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
         }
         try {
-            for (TopicPartition partition : partitions)
-                pausedPartitions.remove(partition);
+            pausedPartitions.removeAll(Arrays.asList(partitions));
             consumer.resume(Arrays.asList(partitions));
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 5c2c72c..b34e483 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -228,7 +228,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
         this.offset = -1;
 
         this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
-        if (this.topic.equals(""))
+        if (this.topic == null || this.topic.trim().length() == 0)
             throw new ConfigException("Must specify topic for connector configuration.");
 
         configLog = setupAndCreateKafkaBasedLog(this.topic, config);
@@ -406,16 +406,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
     // package private for testing
     KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
-        Map<String, Object> producerProps = new HashMap<>(config.originals());
+        Map<String, Object> originals = config.originals();
+        Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-
-        Map<String, Object> consumerProps = new HashMap<>(config.originals());
+        Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
-        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        Map<String, Object> adminProps = new HashMap<>(originals);
         NewTopic topicDescription = TopicAdmin.defineTopic(topic).
                 compacted().
                 partitions(1).
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 2e4e523..f29f3c2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -62,21 +62,22 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     @Override
     public void configure(final WorkerConfig config) {
         String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
-        if (topic.equals(""))
+        if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be specified");
 
         data = new HashMap<>();
 
-        Map<String, Object> producerProps = new HashMap<>(config.originals());
+        Map<String, Object> originals = config.originals();
+        Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
-        Map<String, Object> consumerProps = new HashMap<>(config.originals());
+        Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
-        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        Map<String, Object> adminProps = new HashMap<>(originals);
         NewTopic topicDescription = TopicAdmin.defineTopic(topic).
                 compacted().
                 partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index ca74432..8ca21eb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -121,19 +121,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
     @Override
     public void configure(final WorkerConfig config) {
         this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
-        if (topic.equals(""))
+        if (this.topic == null || this.topic.trim().length() == 0)
             throw new ConfigException("Must specify topic for connector status.");
 
-        Map<String, Object> producerProps = new HashMap<>(config.originals());
+        Map<String, Object> originals = config.originals();
+        Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
 
-        Map<String, Object> consumerProps = new HashMap<>(config.originals());
+        Map<String, Object> consumerProps = new HashMap<>(originals);
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
-        Map<String, Object> adminProps = new HashMap<>(config.originals());
+        Map<String, Object> adminProps = new HashMap<>(originals);
         NewTopic topicDescription = TopicAdmin.defineTopic(topic).
                 compacted().
                 partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
@@ -203,7 +204,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
     }
 
     private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
-        String connector  = status.id();
+        String connector = status.id();
         CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
         String key = CONNECTOR_STATUS_PREFIX + connector;
         send(key, status, entry, safeWrite);
@@ -233,18 +234,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
-                if (exception != null) {
-                    if (exception instanceof RetriableException) {
-                        synchronized (KafkaStatusBackingStore.this) {
-                            if (entry.isDeleted()
-                                    || status.generation() != generation
-                                    || (safeWrite && !entry.canWriteSafely(status, sequence)))
-                                return;
-                        }
-                        kafkaLog.send(key, value, this);
-                    } else {
-                        log.error("Failed to write status update", exception);
+                if (exception == null) return;
+                if (exception instanceof RetriableException) {
+                    synchronized (KafkaStatusBackingStore.this) {
+                        if (entry.isDeleted()
+                            || status.generation() != generation
+                            || (safeWrite && !entry.canWriteSafely(status, sequence)))
+                            return;
                     }
+                    kafkaLog.send(key, value, this);
+                } else {
+                    log.error("Failed to write status update", exception);
                 }
             }
         });

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.