You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:49 UTC

[32/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1904 #comment Add new KafkaInputOperator using kafka 0.9.x consumer API Features includes: Out-of-box One-to-one and one-to-many partition scheme support plus customizable partition s

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
new file mode 100644
index 0000000..e6256f1
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A simple partitioner class for test purpose
+ * Key is a int string
+ * Messages are distributed to all partitions
+ * One for even number, the other for odd
+ */
+public class KafkaTestPartitioner implements Partitioner
+{
+  public KafkaTestPartitioner(VerifiableProperties props) {
+    
+  }
+
+  public KafkaTestPartitioner() {
+
+  }
+
+  @Override
+  public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
+  {
+    int num_partitions = cluster.partitionsForTopic(topic).size();
+    return Integer.parseInt((String)key)%num_partitions;
+  }
+
+  @Override
+  public void close()
+  {
+
+  }
+
+  @Override
+  public void configure(Map<String, ?> map)
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
new file mode 100644
index 0000000..36130ce
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+/**
+ * A kafka producer for testing
+ */
+public class KafkaTestProducer implements Runnable
+{
+  //  private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
+  private final Producer<String, String> producer;
+  private final Producer<String, String> producer1;
+  private final String topic;
+  private int sendCount = 20;
+  // to generate a random int as a key for partition
+  private final Random rand = new Random();
+  private boolean hasPartition = false;
+  private boolean hasMultiCluster = false;
+  private List<String> messages;
+
+  // http://kafka.apache.org/documentation.html#producerconfigs
+  private String ackType = "1";
+
+  public int getSendCount()
+  {
+    return sendCount;
+  }
+
+  public void setSendCount(int sendCount)
+  {
+    this.sendCount = sendCount;
+  }
+
+  public void setMessages(List<String> messages) {
+    this.messages = messages;
+  }
+
+  private Properties createProducerConfig(int cid)
+  {
+    Properties props = new Properties();
+    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
+    String brokerList = "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
+    brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]):"";
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+    props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
+    props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
+
+    return props;
+  }
+
+  public KafkaTestProducer(String topic)
+  {
+    this(topic, false);
+  }
+
+  public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
+  {
+    // Use random partitioner. Don't need the key type. Just set it to Integer.
+    // The message is of type String.
+    this.topic = topic;
+    this.hasPartition = hasPartition;
+    this.hasMultiCluster = hasMultiCluster;
+    producer = new KafkaProducer<>(createProducerConfig(0));
+    if(hasMultiCluster){
+      producer1 = new KafkaProducer<>(createProducerConfig(1));
+    } else {
+      producer1 = null;
+    }
+  }
+
+  public KafkaTestProducer(String topic, boolean hasPartition) {
+    this(topic, hasPartition, false);
+  }
+
+  private void generateMessages()
+  {
+    // Create dummy message
+    int messageNo = 1;
+    while (messageNo <= sendCount) {
+      String messageStr = "Message_" + messageNo;
+      int k = rand.nextInt(100);
+      producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr));
+      if(hasMultiCluster){
+        messageNo++;
+        producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr));
+      }
+      messageNo++;
+      // logger.debug(String.format("Producing %s", messageStr));
+    }
+    // produce the end tuple to let the test input operator know it's done produce messages
+    producer.send(new ProducerRecord<>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+    if(hasMultiCluster) {
+      producer1.send(new ProducerRecord<>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+    }
+    if(hasPartition){
+      // Send end_tuple to other partition if it exist
+      producer.send(new ProducerRecord<>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+      if(hasMultiCluster) {
+        producer1.send(new ProducerRecord<>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+      }
+    }
+  }
+
+  @Override
+  public void run()
+  {
+    if (messages == null) {
+      generateMessages();
+    } else {
+      for (String msg : messages) {
+        Future f = producer.send(new ProducerRecord<>(topic, "", msg));
+        try {
+          f.get(30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    producer.close();
+    if (producer1!=null) {
+      producer1.close();
+    }
+  }
+
+  public void close()
+  {
+    producer.close();
+  }
+
+  public String getAckType()
+  {
+    return ackType;
+  }
+
+  public void setAckType(String ackType)
+  {
+    this.ackType = ackType;
+  }
+} // End of KafkaTestProducer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 874f30f..cea6658 100644
--- a/pom.xml
+++ b/pom.xml
@@ -173,6 +173,7 @@
         <module>benchmark</module>
         <module>apps</module>
         <module>samples</module>
+        <module>kafka</module>
       </modules>
     </profile>
   </profiles>