You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by cs...@apache.org on 2017/07/12 06:54:36 UTC

karaf-decanter git commit: [KARAF-5244] Refactor kafka appender config handling

Repository: karaf-decanter
Updated Branches:
  refs/heads/master 94f5f6a81 -> 79d1154f2


[KARAF-5244] Refactor kafka appender config handling


Project: http://git-wip-us.apache.org/repos/asf/karaf-decanter/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf-decanter/commit/79d1154f
Tree: http://git-wip-us.apache.org/repos/asf/karaf-decanter/tree/79d1154f
Diff: http://git-wip-us.apache.org/repos/asf/karaf-decanter/diff/79d1154f

Branch: refs/heads/master
Commit: 79d1154f2992addc90d4bc59eaaa829d6de878d2
Parents: 94f5f6a
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Wed Jul 12 08:54:27 2017 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Wed Jul 12 08:54:27 2017 +0200

----------------------------------------------------------------------
 .../decanter/appender/kafka/ConfigMapper.java   | 72 ++++++++++++++++
 .../decanter/appender/kafka/KafkaAppender.java  | 90 +-------------------
 2 files changed, 75 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/79d1154f/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
----------------------------------------------------------------------
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
new file mode 100644
index 0000000..f62cac0
--- /dev/null
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/ConfigMapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.appender.kafka;
+
+import java.util.Dictionary;
+import java.util.Properties;
+
+public class ConfigMapper {
+    private Properties config;
+    private Dictionary<String, Object> confSource;
+    
+    public static Properties map(Dictionary<String, Object> conf) {
+        ConfigMapper mapper = new ConfigMapper(conf);
+        return mapper.config;
+    }
+
+    private ConfigMapper(Dictionary<String, Object> conf) {
+        this.confSource = conf;
+        config = new Properties();
+        process("bootstrap.servers", "localhost:9092");
+        process("client.id", "");
+        process("compression.type", "none");
+        process("acks", "all");
+        process("retries", "0");
+        process("batch.size", "16384");
+        process("buffer.memory", "33554432");
+        process("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        process("request.timeout.ms", "5000");
+        process("max.request.size", "2097152");
+        process("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        process("security.protocol");
+        process("ssl.truststore.location");
+        process("ssl.truststore.password");
+        process("ssl.keystore.location");
+        process("ssl.keystore.password");
+        process("ssl.key.password");
+        process("ssl.provider");
+        process("ssl.cipher.suites");
+        process("ssl.enabled.protocols");
+        process("ssl.truststore.type");
+        process("ssl.keystore.type");
+        
+        process("topic", "decanter");
+    }
+    
+    private void process(String key) {
+        process(key, null);
+    }
+
+    private void process(String key, String defaultValue) {
+        String value = (String)confSource.get(key);
+        String usedValue = (value != null) ? value : defaultValue;
+        if (usedValue != null) {
+            config.put(key, usedValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/karaf-decanter/blob/79d1154f/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
----------------------------------------------------------------------
diff --git a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
index f0ece64..a404a53 100644
--- a/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
+++ b/appender/kafka/src/main/java/org/apache/karaf/decanter/appender/kafka/KafkaAppender.java
@@ -16,7 +16,6 @@
  */
 package org.apache.karaf.decanter.appender.kafka;
 
-import java.util.Dictionary;
 import java.util.Properties;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -54,87 +53,9 @@ public class KafkaAppender implements EventHandler {
     @Activate
     @SuppressWarnings("unchecked")
     public void activate(ComponentContext context) {
-        Dictionary<String, Object> config = context.getProperties();
-        this.properties = new Properties();
-
-        String bootstrapServers = getValue(config, "bootstrap.servers", "localhost:9092");
-        properties.put("bootstrap.servers", bootstrapServers);
-
-        String clientId = getValue(config, "client.id", "");
-        properties.put("client.id", clientId);
-
-        String compressionType = getValue(config, "compression.type", "none");
-        properties.put("compression.type", compressionType);
-
-        String acks = getValue(config, "acks", "all");
-        properties.put("acks", acks);
-
-        String retries = getValue(config, "retries", "0");
-        properties.put("retries", Integer.parseInt(retries));
-
-        String batchSize = getValue(config, "batch.size", "16384");
-        properties.put("batch.size", Integer.parseInt(batchSize));
-
-        String bufferMemory = getValue(config, "buffer.memory", "33554432");
-        properties.put("buffer.memory", Long.parseLong(bufferMemory));
-
-        String keySerializer = getValue(config, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        properties.put("key.serializer", keySerializer);
-
-        String requestTimeoutMs = getValue(config, "request.timeout.ms", "5000");
-        properties.put("request.timeout.ms", requestTimeoutMs);
-
-        String maxRequestSize = getValue(config, "max.request.size", "2097152");
-        properties.put("max.request.size", maxRequestSize);
-
-        String valueSerializer = getValue(config, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-        properties.put("value.serializer", valueSerializer);
-
-        String securityProtocol = getValue(config, "security.protocol", null);
-        if (securityProtocol != null)
-            properties.put("security.protocol", securityProtocol);
-
-        String sslTruststoreLocation = getValue(config, "ssl.truststore.location", null);
-        if (sslTruststoreLocation != null)
-            properties.put("ssl.truststore.location", sslTruststoreLocation);
-
-        String sslTruststorePassword = getValue(config, "ssl.truststore.password", null);
-        if (sslTruststorePassword != null)
-            properties.put("ssl.truststore.password", sslTruststorePassword);
-
-        String sslKeystoreLocation = getValue(config, "ssl.keystore.location", null);
-        if (sslKeystoreLocation != null)
-            properties.put("ssl.keystore.location", sslKeystoreLocation);
-
-        String sslKeystorePassword = getValue(config, "ssl.keystore.password", null);
-        if (sslKeystorePassword != null)
-            properties.put("ssl.keystore.password", sslKeystorePassword);
-
-        String sslKeyPassword = getValue(config, "ssl.key.password", null);
-        if (sslKeyPassword != null)
-            properties.put("ssl.key.password", sslKeyPassword);
-
-        String sslProvider = getValue(config, "ssl.provider", null);
-        if (sslProvider != null)
-            properties.put("ssl.provider", sslProvider);
-
-        String sslCipherSuites = getValue(config, "ssl.cipher.suites", null);
-        if (sslCipherSuites != null)
-            properties.put("ssl.cipher.suites", sslCipherSuites);
-
-        String sslEnabledProtocols = getValue(config, "ssl.enabled.protocols", null);
-        if (sslEnabledProtocols != null)
-            properties.put("ssl.enabled.protocols", sslEnabledProtocols);
-
-        String sslTruststoreType = getValue(config, "ssl.truststore.type", null);
-        if (sslTruststoreType != null)
-            properties.put("ssl.truststore.type", sslTruststoreType);
-
-        String sslKeystoreType = getValue(config, "ssl.keystore.type", null);
-        if (sslKeystoreType != null)
-            properties.put("ssl.keystore.type", sslKeystoreType);
-
-        this.topic = getValue(config, "topic", "decanter");
+        this.properties = ConfigMapper.map(context.getProperties());
+        this.topic = properties.getProperty("topic");
+        properties.remove("topic");
 
         // workaround for KAFKA-3218
         ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
@@ -145,11 +66,6 @@ public class KafkaAppender implements EventHandler {
             Thread.currentThread().setContextClassLoader(originClassLoader);
         }
     }
-    
-    private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
-        String value = (String)config.get(key);
-        return (value != null) ? value :  defaultValue;
-    }
 
     @Override
     public void handleEvent(Event event) {