You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/02/14 00:39:30 UTC
[kafka] branch trunk updated: MINOR: Misc improvements on runtime /
storage / metrics / config parts (#4525)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2693e9b MINOR: Misc improvements on runtime / storage / metrics / config parts (#4525)
2693e9b is described below
commit 2693e9be7412ec03173c8942e9ccfcc24cfbbce1
Author: Benedict Jin <15...@qq.com>
AuthorDate: Wed Feb 14 08:39:21 2018 +0800
MINOR: Misc improvements on runtime / storage / metrics / config parts (#4525)
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../org/apache/kafka/common/config/ConfigDef.java | 4 ++-
.../org/apache/kafka/common/metrics/Metrics.java | 2 +-
.../kafka/connect/cli/ConnectDistributed.java | 6 ++--
.../connect/runtime/WorkerSinkTaskContext.java | 3 +-
.../connect/storage/KafkaConfigBackingStore.java | 10 +++----
.../connect/storage/KafkaOffsetBackingStore.java | 9 +++---
.../connect/storage/KafkaStatusBackingStore.java | 32 +++++++++++-----------
7 files changed, 35 insertions(+), 31 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index bb199dd..bbe69a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -877,7 +877,9 @@ public class ConfigDef {
}
public String toString() {
- if (min == null)
+ if (min == null && max == null)
+ return "[...]";
+ else if (min == null)
return "[...," + max + "]";
else if (max == null)
return "[" + min + ",...]";
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 8868ee7..ea18cd3 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -461,7 +461,7 @@ public class Metrics implements Closeable {
* This is a way to expose existing values as metrics.
*
* This method is kept for binary compatibility purposes, it has the same behaviour as
- * {@link #addMetric(MetricName, MetricValue)}.
+ * {@link #addMetric(MetricName, MetricValueProvider)}.
*
* @param metricName The name of the metric
* @param measurable The measurable that will be measured by this metric
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 98a77ed..4afa47d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
@@ -83,10 +84,11 @@ public class ConnectDistributed {
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
- StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
+ Converter internalValueConverter = worker.getInternalValueConverter();
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
statusBackingStore.configure(config);
- ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config);
+ ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config);
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 8076403..0878949 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -104,8 +104,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
}
try {
- for (TopicPartition partition : partitions)
- pausedPartitions.remove(partition);
+ pausedPartitions.removeAll(Arrays.asList(partitions));
consumer.resume(Arrays.asList(partitions));
} catch (IllegalStateException e) {
throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 5c2c72c..b34e483 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -228,7 +228,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
this.offset = -1;
this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
- if (this.topic.equals(""))
+ if (this.topic == null || this.topic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector configuration.");
configLog = setupAndCreateKafkaBasedLog(this.topic, config);
@@ -406,16 +406,16 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
// package private for testing
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
- Map<String, Object> producerProps = new HashMap<>(config.originals());
+ Map<String, Object> originals = config.originals();
+ Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
-
- Map<String, Object> consumerProps = new HashMap<>(config.originals());
+ Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- Map<String, Object> adminProps = new HashMap<>(config.originals());
+ Map<String, Object> adminProps = new HashMap<>(originals);
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(1).
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 2e4e523..f29f3c2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -62,21 +62,22 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
@Override
public void configure(final WorkerConfig config) {
String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
- if (topic.equals(""))
+ if (topic == null || topic.trim().length() == 0)
throw new ConfigException("Offset storage topic must be specified");
data = new HashMap<>();
- Map<String, Object> producerProps = new HashMap<>(config.originals());
+ Map<String, Object> originals = config.originals();
+ Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
- Map<String, Object> consumerProps = new HashMap<>(config.originals());
+ Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- Map<String, Object> adminProps = new HashMap<>(config.originals());
+ Map<String, Object> adminProps = new HashMap<>(originals);
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index ca74432..8ca21eb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -121,19 +121,20 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
@Override
public void configure(final WorkerConfig config) {
this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
- if (topic.equals(""))
+ if (this.topic == null || this.topic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector status.");
- Map<String, Object> producerProps = new HashMap<>(config.originals());
+ Map<String, Object> originals = config.originals();
+ Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
- Map<String, Object> consumerProps = new HashMap<>(config.originals());
+ Map<String, Object> consumerProps = new HashMap<>(originals);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- Map<String, Object> adminProps = new HashMap<>(config.originals());
+ Map<String, Object> adminProps = new HashMap<>(originals);
NewTopic topicDescription = TopicAdmin.defineTopic(topic).
compacted().
partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
@@ -203,7 +204,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
}
private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
- String connector = status.id();
+ String connector = status.id();
CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
String key = CONNECTOR_STATUS_PREFIX + connector;
send(key, status, entry, safeWrite);
@@ -233,18 +234,17 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception != null) {
- if (exception instanceof RetriableException) {
- synchronized (KafkaStatusBackingStore.this) {
- if (entry.isDeleted()
- || status.generation() != generation
- || (safeWrite && !entry.canWriteSafely(status, sequence)))
- return;
- }
- kafkaLog.send(key, value, this);
- } else {
- log.error("Failed to write status update", exception);
+ if (exception == null) return;
+ if (exception instanceof RetriableException) {
+ synchronized (KafkaStatusBackingStore.this) {
+ if (entry.isDeleted()
+ || status.generation() != generation
+ || (safeWrite && !entry.canWriteSafely(status, sequence)))
+ return;
}
+ kafkaLog.send(key, value, this);
+ } else {
+ log.error("Failed to write status update", exception);
}
}
});
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.