You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/24 08:18:52 UTC
git commit: FLUME-2470. Kafka Sink and Source config updates. Missed
a couple files in the last commit, adding them.
Repository: flume
Updated Branches:
refs/heads/trunk bde2c2821 -> 26444fd7a
FLUME-2470. Kafka Sink and Source config updates. Missed a couple files in the last commit, adding them.
(Gwen Shapira via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/26444fd7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/26444fd7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/26444fd7
Branch: refs/heads/trunk
Commit: 26444fd7a8c804b6f2507ee3bf54e1c811ee5168
Parents: bde2c28
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Sep 23 23:17:40 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Sep 23 23:18:44 2014 -0700
----------------------------------------------------------------------
.../apache/flume/sink/kafka/KafkaSinkUtil.java | 103 +++++++++++++++++++
.../flume/sink/kafka/KafkaSinkUtilTest.java | 55 ++++++++++
2 files changed, 158 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/26444fd7/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
new file mode 100644
index 0000000..66bde85
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.kafka;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.util.PropertiesTrait;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaSinkUtil {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(KafkaSinkUtil.class);
+
+ public static Properties getKafkaProperties(Context context) {
+ log.info("context={}",context.toString());
+ Properties props = generateDefaultKafkaProps();
+ setKafkaProps(context, props);
+ addDocumentedKafkaProps(context, props);
+ return props;
+ }
+
+ /**
+ * Some of the producer properties are especially important
+ * We documented them and gave them a camel-case name to match Flume config
+ * If user set these, we will override any existing parameters with these
+ * settings.
+ * Knowledge of which properties are documented is maintained here for now.
+ * If this will become a maintenance issue we'll set a proper data structure.
+ */
+ private static void addDocumentedKafkaProps(Context context,
+ Properties kafkaProps)
+ throws ConfigurationException {
+ String brokerList = context.getString(KafkaSinkConstants
+ .BROKER_LIST_FLUME_KEY);
+ if (brokerList == null) {
+ throw new ConfigurationException("brokerList must contain at least " +
+ "one Kafka broker");
+ }
+ kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
+
+ String requiredKey = context.getString(
+ KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
+
+ if (requiredKey != null ) {
+ kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
+ }
+ }
+
+
+ /**
+ * Generate producer properties object with some defaults
+ * @return
+ */
+ private static Properties generateDefaultKafkaProps() {
+ Properties props = new Properties();
+ props.put(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY,
+ KafkaSinkConstants.DEFAULT_MESSAGE_SERIALIZER);
+ props.put(KafkaSinkConstants.KEY_SERIALIZER_KEY,
+ KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+ props.put(KafkaSinkConstants.REQUIRED_ACKS_KEY,
+ KafkaSinkConstants.DEFAULT_REQUIRED_ACKS);
+ return props;
+ }
+
+
+ /**
+ * Add all configuration parameters starting with "kafka"
+ * to producer properties
+ */
+ private static void setKafkaProps(Context context, Properties kafkaProps) {
+
+ Map<String,String> kafkaProperties =
+ context.getSubProperties(KafkaSinkConstants.PROPERTY_PREFIX);
+
+ for (Map.Entry<String,String> prop : kafkaProperties.entrySet()) {
+
+ kafkaProps.put(prop.getKey(), prop.getValue());
+ if (log.isDebugEnabled()) {
+ log.debug("Reading a Kafka Producer Property: key: "
+ + prop.getKey() + ", value: " + prop.getValue());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/26444fd7/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
new file mode 100644
index 0000000..84d213c
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/KafkaSinkUtilTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.kafka;
+
+import junit.framework.TestCase;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSinkUtilTest extends TestCase {
+
+ @Test
+ public void testGetKafkaProperties() {
+ Context context = new Context();
+ context.put("kafka.serializer.class", "override.default.serializer");
+ context.put("kafka.fake.property", "kafka.property.value");
+ context.put("kafka.metadata.broker.list","bad-broker-list");
+ context.put("brokerList","real-broker-list");
+ Properties kafkaProps = KafkaSinkUtil.getKafkaProperties(context);
+
+ //check that we have defaults set
+ assertEquals(
+ kafkaProps.getProperty(KafkaSinkConstants.KEY_SERIALIZER_KEY),
+ KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
+ //check that kafka properties override the default and get correct name
+ assertEquals(
+ kafkaProps.getProperty(KafkaSinkConstants.MESSAGE_SERIALIZER_KEY),
+ "override.default.serializer");
+ //check that any kafka property gets in
+ assertEquals(kafkaProps.getProperty("fake.property"),
+ "kafka.property.value");
+ //check that documented property overrides defaults
+ assertEquals(kafkaProps.getProperty("metadata.broker.list")
+ ,"real-broker-list");
+ }
+}
\ No newline at end of file