You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/10/24 21:51:20 UTC

apex-malhar git commit: fixes two issues - pick the metadata consumer properties and setting the consumer properties from Properties.xml for kafka inputoperator not working

Repository: apex-malhar
Updated Branches:
  refs/heads/master 6ddefd02a -> c89e63621


fixes two issues - pick the metadata consumer properties and setting the consumer properties from Properties.xml for kafka inputoperator not working

added test case

Removed trailing spaces

Removed unused imports

removed static from getPropertyAsString implementation in AbstractKafkaPartitioner


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c89e6362
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c89e6362
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c89e6362

Branch: refs/heads/master
Commit: c89e636217a3416908fce7d57503f937cbdea2e9
Parents: 6ddefd0
Author: venkateshDT <ve...@datatorrent.com>
Authored: Thu Sep 22 13:27:32 2016 -0700
Committer: venkateshDT <ve...@datatorrent.com>
Committed: Mon Oct 24 13:48:59 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       |  2 +-
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 23 +++++-
 .../kafka/KafkaConsumerPropertiesTest.java      | 79 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 4cf2888..6fc7693 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -131,7 +131,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
 
   private int holdingBufferSize = 1024;
 
-  private Properties consumerProps;
+  private Properties consumerProps = new Properties();
 
   /**
    * Assignment for each operator instance

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index ad5c3fa..c9b40be 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -18,6 +18,8 @@
  */
 package org.apache.apex.malhar.kafka;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -222,17 +224,36 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
     metadataRefreshClients = new ArrayList<>(clusters.length);
     int index = 0;
     for (String c : clusters) {
-      Properties prop = new Properties();
+      Properties prop = prototypeOperator.getConsumerProps();
       prop.put("group.id", META_CONSUMER_GROUP_NAME);
       prop.put("bootstrap.servers", c);
       prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
       prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
       prop.put("enable.auto.commit", "false");
+      if (logger.isInfoEnabled()) {
+        logger.info("Consumer Properties :  {} ", getPropertyAsString(prop));
+      }
       metadataRefreshClients.add(index++, new KafkaConsumer<byte[], byte[]>(prop));
     }
   }
 
   /**
+   * Converts the property list (key and element pairs) to String format
+   * This format is used to print to a Stream for debugging.
+   * @param prop
+   * @return String
+   */
+  private String getPropertyAsString(Properties prop) {
+    StringWriter writer = new StringWriter();
+    try {
+      prop.store(writer, "");
+    } catch (IOException e) {
+      logger.error("Cannot retrieve consumer properties for Logging : {}", e.getMessage() );
+    }
+    return writer.getBuffer().toString();
+  }
+
+  /**
    * The key object used in the assignment map for each operator
    */
   public static class PartitionMeta

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c89e6362/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
new file mode 100644
index 0000000..83e0de6
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.text.ParseException;
+import java.util.Properties;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.kafka.common.KafkaException;
+
+import com.datatorrent.api.Context;
+
+public class KafkaConsumerPropertiesTest
+{
+
+  KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
+  @Rule
+  public Watcher watcher = new Watcher();
+
+  public class Watcher extends TestWatcher
+  {
+    Context.OperatorContext context;
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      kafkaInput.setClusters("localhost:8087");
+      kafkaInput.setInitialPartitionCount(1);
+      kafkaInput.setTopics("apexTest");
+      kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+      Properties prop = new Properties();
+      prop.setProperty("security.protocol","SASL_PLAINTEXT");
+      prop.setProperty("sasl.kerberos.service.name","kafka");
+      kafkaInput.setConsumerProps(prop);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      super.finished(description);
+    }
+  }
+
+  @Test
+  public void TestConsumerProperties() throws ParseException
+  {
+    //Added test on this check to ensure consumer properties are set and not reset between.
+    if (null != kafkaInput.getConsumerProps().get("security.protocol")) {
+      try {
+        kafkaInput.definePartitions(null, null);
+      } catch (KafkaException e) {
+        //Ensures the  properties of the consumer are set/not reset.
+        Assert.assertEquals("java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in " +
+          "secure mode.", e.getCause().getMessage());
+      }
+    }
+  }
+}