You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/10/24 21:51:20 UTC
apex-malhar git commit: fixes two issues - pick the metadata consumer
properties and setting the consumer properties from Properties.xml for kafka
inputoperator not working
Repository: apex-malhar
Updated Branches:
refs/heads/master 6ddefd02a -> c89e63621
fixes two issues - pick the metadata consumer properties and setting the consumer properties from Properties.xml for kafka inputoperator not working
added test case
Removed trailing spaces
Removed unused imports
removed static from getPropertyAsString implementation in AbstractKafkaPartitioner
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c89e6362
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c89e6362
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c89e6362
Branch: refs/heads/master
Commit: c89e636217a3416908fce7d57503f937cbdea2e9
Parents: 6ddefd0
Author: venkateshDT <ve...@datatorrent.com>
Authored: Thu Sep 22 13:27:32 2016 -0700
Committer: venkateshDT <ve...@datatorrent.com>
Committed: Mon Oct 24 13:48:59 2016 -0700
----------------------------------------------------------------------
.../kafka/AbstractKafkaInputOperator.java | 2 +-
.../malhar/kafka/AbstractKafkaPartitioner.java | 23 +++++-
.../kafka/KafkaConsumerPropertiesTest.java | 79 ++++++++++++++++++++
3 files changed, 102 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 4cf2888..6fc7693 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
private int holdingBufferSize = 1024;
- private Properties consumerProps;
+ private Properties consumerProps = new Properties();
/**
* Assignment for each operator instance
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index ad5c3fa..c9b40be 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -18,6 +18,8 @@
*/
package org.apache.apex.malhar.kafka;
+import java.io.IOException;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -222,17 +224,36 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
metadataRefreshClients = new ArrayList<>(clusters.length);
int index = 0;
for (String c : clusters) {
- Properties prop = new Properties();
+ Properties prop = prototypeOperator.getConsumerProps();
prop.put("group.id", META_CONSUMER_GROUP_NAME);
prop.put("bootstrap.servers", c);
prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
prop.put("enable.auto.commit", "false");
+ if (logger.isInfoEnabled()) {
+ logger.info("Consumer Properties : {} ", getPropertyAsString(prop));
+ }
metadataRefreshClients.add(index++, new KafkaConsumer<byte[], byte[]>(prop));
}
}
/**
+ * Converts the property list (key and element pairs) to String format
+ * This format is used to print to a Stream for debugging.
+ * @param prop
+ * @return String
+ */
+ private String getPropertyAsString(Properties prop) {
+ StringWriter writer = new StringWriter();
+ try {
+ prop.store(writer, "");
+ } catch (IOException e) {
+ logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage() );
+ }
+ return writer.getBuffer().toString();
+ }
+
+ /**
* The key object used in the assignment map for each operator
*/
public static class PartitionMeta
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..83e0de6
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.text.ParseException;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.kafka.common.KafkaException;
+
+import com.datatorrent.api.Context;
+
+public class KafkaConsumerPropertiesTest
+{
+
+ KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
+ @Rule
+ public Watcher watcher = new Watcher();
+
+ public class Watcher extends TestWatcher
+ {
+ Context.OperatorContext context;
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ kafkaInput.setClusters("localhost:8087");
+ kafkaInput.setInitialPartitionCount(1);
+ kafkaInput.setTopics("apexTest");
+ kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+ Properties prop = new Properties();
+ prop.setProperty("security.protocol","SASL_PLAINTEXT");
+ prop.setProperty("sasl.kerberos.service.name","kafka");
+ kafkaInput.setConsumerProps(prop);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ super.finished(description);
+ }
+ }
+
+ @Test
+ public void TestConsumerProperties() throws ParseException
+ {
+ //Added test on this check to ensure consumer properties are set and not reset between.
+ if (null != kafkaInput.getConsumerProps().get("security.protocol")) {
+ try {
+ kafkaInput.definePartitions(null, null);
+ } catch (KafkaException e) {
+ //Ensures the properties of the consumer are set/not reset.
+ Assert.assertEquals("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " +
+ "secure mode.", e.getCause().getMessage());
+ }
+ }
+ }
+}