You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by cs...@apache.org on 2017/07/12 06:54:36 UTC
karaf-decanter git commit: [KARAF-5244] Refactor kafka appender
config handling
Repository: karaf-decanter
Updated Branches:
refs/heads/master 94f5f6a81 -> 79d1154f2
[KARAF-5244] Refactor kafka appender config handling
Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/79d1154f
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/79d1154f
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/79d1154f
Branch: refs/heads/master
Commit: 79d1154f2992addc90d4bc59eaaa829d6de878d2
Parents: 94f5f6a
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Wed Jul 12 08:54:27 2017 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Wed Jul 12 08:54:27 2017 +0200
----------------------------------------------------------------------
.../decanter/appender/kafka/ConfigMapper.java | 72 ++++++++++++++++
.../decanter/appender/kafka/KafkaAppender.java | 90 +-------------------
2 files changed, 75 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/79d1154f/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
----------------------------------------------------------------------
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
new file mode 100644
index 0000000..f62cac0
--- /dev/null
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import java.util.Dictionary;
+import java.util.Properties;
+
+public class ConfigMapper {
+ private Properties config;
+ private Dictionary<String, Object> confSource;
+
+ public static Properties map(Dictionary<String, Object> conf) {
+ ConfigMapper mapper = new ConfigMapper(conf);
+ return mapper.config;
+ }
+
+ private ConfigMapper(Dictionary<String, Object> conf) {
+ this.confSource = conf;
+ config = new Properties();
+ process("bootstrap.servers", "localhost:9092");
+ process("client.id", "");
+ process("compression.type", "none");
+ process("acks", "all");
+ process("retries", "0");
+ process("batch.size", "16384");
+ process("buffer.memory", "33554432");
+ process("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ process("request.timeout.ms", "5000");
+ process("max.request.size", "2097152");
+ process("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ process("security.protocol");
+ process("ssl.truststore.location");
+ process("ssl.truststore.password");
+ process("ssl.keystore.location");
+ process("ssl.keystore.password");
+ process("ssl.key.password");
+ process("ssl.provider");
+ process("ssl.cipher.suites");
+ process("ssl.enabled.protocols");
+ process("ssl.truststore.type");
+ process("ssl.keystore.type");
+
+ process("topic", "decanter");
+ }
+
+ private void process(String key) {
+ process(key, null);
+ }
+
+ private void process(String key, String defaultValue) {
+ String value = (String)confSource.get(key);
+ String usedValue = (value != null) ? value : defaultValue;
+ if (usedValue != null) {
+ config.put(key, usedValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/79d1154f/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
index f0ece64..a404a53 100644
--- a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
@@ -16,7 +16,6 @@
*/
package org.apache.karaf.decanter.appender.kafka;
-import java.util.Dictionary;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
@@ -54,87 +53,9 @@ public class KafkaAppender implements EventHandler {
@Activate
@SuppressWarnings("unchecked")
public void activate(ComponentContext context) {
- Dictionary<String, Object> config = context.getProperties();
- this.properties = new Properties();
-
- String bootstrapServers = getValue(config, "bootstrap.servers", "localhost:9092");
- properties.put("bootstrap.servers", bootstrapServers);
-
- String clientId = getValue(config, "client.id", "");
- properties.put("client.id", clientId);
-
- String compressionType = getValue(config, "compression.type", "none");
- properties.put("compression.type", compressionType);
-
- String acks = getValue(config, "acks", "all");
- properties.put("acks", acks);
-
- String retries = getValue(config, "retries", "0");
- properties.put("retries", Integer.parseInt(retries));
-
- String batchSize = getValue(config, "batch.size", "16384");
- properties.put("batch.size", Integer.parseInt(batchSize));
-
- String bufferMemory = getValue(config, "buffer.memory", "33554432");
- properties.put("buffer.memory", Long.parseLong(bufferMemory));
-
- String keySerializer = getValue(config, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("key.serializer", keySerializer);
-
- String requestTimeoutMs = getValue(config, "request.timeout.ms", "5000");
- properties.put("request.timeout.ms", requestTimeoutMs);
-
- String maxRequestSize = getValue(config, "max.request.size", "2097152");
- properties.put("max.request.size", maxRequestSize);
-
- String valueSerializer = getValue(config, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", valueSerializer);
-
- String securityProtocol = getValue(config, "security.protocol", null);
- if (securityProtocol != null)
- properties.put("security.protocol", securityProtocol);
-
- String sslTruststoreLocation = getValue(config, "ssl.truststore.location", null);
- if (sslTruststoreLocation != null)
- properties.put("ssl.truststore.location", sslTruststoreLocation);
-
- String sslTruststorePassword = getValue(config, "ssl.truststore.password", null);
- if (sslTruststorePassword != null)
- properties.put("ssl.truststore.password", sslTruststorePassword);
-
- String sslKeystoreLocation = getValue(config, "ssl.keystore.location", null);
- if (sslKeystoreLocation != null)
- properties.put("ssl.keystore.location", sslKeystoreLocation);
-
- String sslKeystorePassword = getValue(config, "ssl.keystore.password", null);
- if (sslKeystorePassword != null)
- properties.put("ssl.keystore.password", sslKeystorePassword);
-
- String sslKeyPassword = getValue(config, "ssl.key.password", null);
- if (sslKeyPassword != null)
- properties.put("ssl.key.password", sslKeyPassword);
-
- String sslProvider = getValue(config, "ssl.provider", null);
- if (sslProvider != null)
- properties.put("ssl.provider", sslProvider);
-
- String sslCipherSuites = getValue(config, "ssl.cipher.suites", null);
- if (sslCipherSuites != null)
- properties.put("ssl.cipher.suites", sslCipherSuites);
-
- String sslEnabledProtocols = getValue(config, "ssl.enabled.protocols", null);
- if (sslEnabledProtocols != null)
- properties.put("ssl.enabled.protocols", sslEnabledProtocols);
-
- String sslTruststoreType = getValue(config, "ssl.truststore.type", null);
- if (sslTruststoreType != null)
- properties.put("ssl.truststore.type", sslTruststoreType);
-
- String sslKeystoreType = getValue(config, "ssl.keystore.type", null);
- if (sslKeystoreType != null)
- properties.put("ssl.keystore.type", sslKeystoreType);
-
- this.topic = getValue(config, "topic", "decanter");
+ this.properties = ConfigMapper.map(context.getProperties());
+ this.topic = properties.getProperty("topic");
+ properties.remove("topic");
// workaround for KAFKA-3218
ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
@@ -145,11 +66,6 @@ public class KafkaAppender implements EventHandler {
Thread.currentThread().setContextClassLoader(originClassLoader);
}
}
-
- private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
- String value = (String)config.get(key);
- return (value != null) ? value : defaultValue;
- }
@Override
public void handleEvent(Event event) {