You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/25 05:06:54 UTC
[kafka] branch trunk updated: KAFKA-6180;
Add a Validator for NonNull configurations and remove redundant
null checks on lists (#4188)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aa42a11 KAFKA-6180; Add a Validator for NonNull configurations and remove redundant null checks on lists (#4188)
aa42a11 is described below
commit aa42a11dfd99ee9ab24d2e9a7521ef1c97ae1ff4
Author: Charly Molter <ch...@gmail.com>
AuthorDate: Thu Jan 25 05:06:44 2018 +0000
KAFKA-6180; Add a Validator for NonNull configurations and remove redundant null checks on lists (#4188)
---
.../kafka/clients/consumer/ConsumerConfig.java | 9 +++++--
.../kafka/clients/consumer/KafkaConsumer.java | 10 +++----
.../kafka/clients/producer/KafkaProducer.java | 31 +++++++---------------
.../kafka/clients/producer/ProducerConfig.java | 9 ++++---
.../apache/kafka/common/config/AbstractConfig.java | 4 +--
.../org/apache/kafka/common/config/ConfigDef.java | 29 ++++++++++++++++++++
.../kafka/common/security/ssl/SslFactory.java | 4 +--
.../java/org/apache/kafka/common/utils/Utils.java | 2 ++
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../kafka/common/config/AbstractConfigTest.java | 16 +++++++++++
.../apache/kafka/common/config/ConfigDefTest.java | 2 ++
.../kafka/connect/runtime/ConnectorConfig.java | 8 ++----
core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +--
13 files changed, 84 insertions(+), 46 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 3fe58d7..72e496c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -257,6 +257,8 @@ public class ConsumerConfig extends AbstractConfig {
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
+ Collections.emptyList(),
+ new ConfigDef.NonNullValidator(),
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
@@ -273,6 +275,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Collections.singletonList(RangeAssignor.class),
+ new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
@@ -382,7 +385,8 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
- "",
+ Collections.emptyList(),
+ new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
@@ -407,7 +411,8 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
- null,
+ Collections.emptyList(),
+ new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(MAX_POLL_RECORDS_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 08877c9..0bbbcf1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -63,6 +63,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -683,7 +684,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
- this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -815,7 +816,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.fetcher = fetcher;
- this.interceptors = interceptors;
+ this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.client = client;
this.metrics = metrics;
@@ -1122,10 +1123,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();
- if (this.interceptors == null)
- return new ConsumerRecords<>(records);
- else
- return this.interceptors.onConsume(new ConsumerRecords<>(records));
+ return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4e67fe8..5fc9a1b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -378,7 +378,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
- this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
+ this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
@@ -780,7 +780,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
- ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
+ ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
@@ -822,7 +822,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
- Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
+ Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
@@ -842,29 +842,24 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
- if (this.interceptors != null)
- this.interceptors.onSendError(record, tp, e);
+ this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
- if (this.interceptors != null)
- this.interceptors.onSendError(record, tp, e);
+ this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
- if (this.interceptors != null)
- this.interceptors.onSendError(record, tp, e);
+ this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
- if (this.interceptors != null)
- this.interceptors.onSendError(record, tp, e);
+ this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
- if (this.interceptors != null)
- this.interceptors.onSendError(record, tp, e);
+ this.interceptors.onSendError(record, tp, e);
throw e;
}
}
@@ -1198,14 +1193,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (this.interceptors != null) {
- if (metadata == null) {
- this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP,
- Long.valueOf(-1L), -1, -1), exception);
- } else {
- this.interceptors.onAcknowledgement(metadata, exception);
- }
- }
+ metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1);
+ this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 0631814..6428dc4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -221,7 +222,7 @@ public class ProducerConfig extends AbstractConfig {
"Note that transactions requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`.";
static {
- CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
@@ -273,7 +274,8 @@ public class ProducerConfig extends AbstractConfig {
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
- "",
+ Collections.emptyList(),
+ new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
@@ -302,7 +304,8 @@ public class ProducerConfig extends AbstractConfig {
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
- null,
+ Collections.emptyList(),
+ new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 61a5798..9e32074 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -289,9 +289,7 @@ public class AbstractConfig {
*/
public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String, Object> configOverrides) {
List<String> klasses = getList(key);
- List<T> objects = new ArrayList<T>();
- if (klasses == null)
- return objects;
+ List<T> objects = new ArrayList<>();
Map<String, Object> configPairs = originals();
configPairs.putAll(configOverrides);
for (Object klass : klasses) {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 3340ab3..3080298 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -937,6 +937,35 @@ public class ConfigDef {
}
}
+ public static class NonNullValidator implements Validator {
+ @Override
+ public void ensureValid(String name, Object value) {
+ if (value == null) {
+ // Pass in the string null to avoid the findbugs warning
+ throw new ConfigException(name, "null", "entry must be non null");
+ }
+ }
+ }
+
+ public static class CompositeValidator implements Validator {
+ private final List<Validator> validators;
+
+ private CompositeValidator(List<Validator> validators) {
+ this.validators = Collections.unmodifiableList(validators);
+ }
+
+ public static CompositeValidator of(Validator... validators) {
+ return new CompositeValidator(Arrays.asList(validators));
+ }
+
+ @Override
+ public void ensureValid(String name, Object value) {
+ for (Validator validator: validators) {
+ validator.ensureValid(name, value);
+ }
+ }
+ }
+
public static class NonEmptyString implements Validator {
@Override
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 285582c..0d1fbf9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -89,12 +89,12 @@ public class SslFactory implements Reconfigurable {
@SuppressWarnings("unchecked")
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
- if (cipherSuitesList != null)
+ if (cipherSuitesList != null && !cipherSuitesList.isEmpty())
this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
@SuppressWarnings("unchecked")
List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
- if (enabledProtocolsList != null)
+ if (enabledProtocolsList != null && !enabledProtocolsList.isEmpty())
this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8d8f118..9da3822 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -54,6 +54,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
@@ -464,6 +465,7 @@ public final class Utils {
* @return The string representation.
*/
public static <T> String join(Collection<T> list, String separator) {
+ Objects.requireNonNull(list);
StringBuilder sb = new StringBuilder();
Iterator<T> iter = list.iterator();
while (iter.hasNext()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ab682d6..a827168 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1615,7 +1615,7 @@ public class KafkaConsumerTest {
OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST;
List<PartitionAssignor> assignors = Arrays.asList(assignor);
- ConsumerInterceptors<String, String> interceptors = null;
+ ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.<ConsumerInterceptor<String, String>>emptyList());
Metrics metrics = new Metrics();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 9e21179..2e15715 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.security.TestSecurityConfig;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,21 @@ public class AbstractConfigTest {
}
@Test
+ public void testEmptyList() {
+ AbstractConfig conf;
+ ConfigDef configDef = new ConfigDef().define("a", Type.LIST, "", new ConfigDef.NonNullValidator(), Importance.HIGH, "doc");
+
+ conf = new AbstractConfig(configDef, Collections.emptyMap());
+ assertEquals(Collections.emptyList(), conf.getList("a"));
+
+ conf = new AbstractConfig(configDef, Collections.singletonMap("a", ""));
+ assertEquals(Collections.emptyList(), conf.getList("a"));
+
+ conf = new AbstractConfig(configDef, Collections.singletonMap("a", "b,c,d"));
+ assertEquals(Arrays.asList("b", "c", "d"), conf.getList("a"));
+ }
+
+ @Test
public void testOriginalsWithPrefix() {
Properties props = new Properties();
props.put("foo.bar", "abc");
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index ed4997d..602147b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -158,6 +158,8 @@ public class ConfigDefTest {
testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs", null});
testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
+ testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null});
+ testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"});
}
@Test
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 0f8c390..e63d100 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -101,16 +101,15 @@ public class ConnectorConfig extends AbstractConfig {
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
- .define(TRANSFORMS_CONFIG, Type.LIST, null, new ConfigDef.Validator() {
+ .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
- if (value == null) return;
final List<String> transformAliases = (List<String>) value;
if (transformAliases.size() > new HashSet<>(transformAliases).size()) {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
}
- }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
+ }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
}
public ConnectorConfig(Plugins plugins) {
@@ -139,9 +138,6 @@ public class ConnectorConfig extends AbstractConfig {
*/
public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
- if (transformAliases == null || transformAliases.isEmpty()) {
- return Collections.emptyList();
- }
final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 47f13f6..fba186c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,7 +18,7 @@
package kafka.server
import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
import kafka.cluster.EndPoint
@@ -894,7 +894,7 @@ object KafkaConfig {
.define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc)
.define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc)
.define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
- .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc)
+ .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
/** ********* Sasl Configuration ****************/
.define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.