You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/24 08:18:52 UTC

git commit: FLUME-2470. Kafka Sink and Source config updates. Missed a couple files in the last commit, adding them.

Repository: flume
Updated Branches:
  refs/heads/trunk bde2c2821 -> 26444fd7a


FLUME-2470. Kafka Sink and Source config updates. Missed a couple files in the last commit, adding them.

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/26444fd7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/26444fd7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/26444fd7

Branch: refs/heads/trunk
Commit: 26444fd7a8c804b6f2507ee3bf54e1c811ee5168
Parents: bde2c28
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Sep 23 23:17:40 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Sep 23 23:18:44 2014 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/kafka/KafkaSinkUtil.java  | 103 +++++++++++++++++++
 .../flume/sink/kafka/KafkaSinkUtilTest.java     |  55 ++++++++++
 2 files changed, 158 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/26444fd7/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
new file mode 100644
index 0000000..66bde85
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.kafka;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.util.PropertiesTrait;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaSinkUtil {
+
+  private static final Logger log =
+          LoggerFactory.getLogger(KafkaSinkUtil.class);
+
+  public static Properties getKafkaProperties(Context context) {
+    log.info("context={}",context.toString());
+    Properties props =  generateDefaultKafkaProps();
+    setKafkaProps(context, props);
+    addDocumentedKafkaProps(context, props);
+    return props;
+  }
+
+  /**
+   * Some of the producer properties are especially important
+   * We documented them and gave them a camel-case name to match Flume config
+   * If user set these, we will override any existing parameters with these
+   * settings.
+   * Knowledge of which properties are documented is maintained here for now.
+   * If this will become a maintenance issue we'll set a proper data structure.
+   */
+  private static void addDocumentedKafkaProps(Context context,
+                                              Properties kafkaProps)
+          throws ConfigurationException {
+    String brokerList = context.getString(KafkaSinkConstants
+            .BROKER_LIST_FLUME_KEY);
+    if (brokerList == null) {
+      throw new ConfigurationException("brokerList must contain at least " +
+              "one Kafka broker");
+    }
+    kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
+
+    String requiredKey = context.getString(
+            KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
+
+    if (requiredKey != null ) {
+      kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
+    }
+  }
+
+
+  /**
+   * Generate producer properties object with some defaults
+   * @return
+   */
+  private static Properties generateDefaultKafkaProps() {
+    Properties props = new Properties();
+    props.put(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY,
+            KafkaSinkConstants.DEFAULT_MESSAGE_SERIALIZER);
+    props.put(KafkaSinkConstants.KEY_SERIALIZER_KEY,
+            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+    props.put(KafkaSinkConstants.REQUIRED_ACKS_KEY,
+            KafkaSinkConstants.DEFAULT_REQUIRED_ACKS);
+    return props;
+  }
+
+
+  /**
+   * Add all configuration parameters starting with "kafka"
+   * to producer properties
+   */
+  private static void setKafkaProps(Context context, Properties kafkaProps) {
+
+    Map<String,String> kafkaProperties =
+            context.getSubProperties(KafkaSinkConstants.PROPERTY_PREFIX);
+
+    for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
+
+      kafkaProps.put(prop.getKey(), prop.getValue());
+      if (log.isDebugEnabled()) {
+        log.debug("Reading a Kafka Producer Property: key: "
+                + prop.getKey() + ", value: " + prop.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/26444fd7/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
new file mode 100644
index 0000000..84d213c
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import junit.framework.TestCase;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSinkUtilTest extends TestCase {
+
+  @Test
+  public void testGetKafkaProperties() {
+    Context context = new Context();
+    context.put("kafka.serializer.class", "override.default.serializer");
+    context.put("kafka.fake.property", "kafka.property.value");
+    context.put("kafka.metadata.broker.list","bad-broker-list");
+    context.put("brokerList","real-broker-list");
+    Properties kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
+
+    //check that we have defaults set
+    assertEquals(
+            kafkaProps.getProperty(KafkaSinkConstants.KEY_SERIALIZER_KEY),
+            KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+    //check that kafka properties override the default and get correct name
+    assertEquals(
+            kafkaProps.getProperty(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY),
+            "override.default.serializer");
+    //check that any kafka property gets in
+    assertEquals(kafkaProps.getProperty("fake.property"),
+            "kafka.property.value");
+    //check that documented property overrides defaults
+    assertEquals(kafkaProps.getProperty("metadata.broker.list")
+            ,"real-broker-list");
+  }
+}
\ No newline at end of file