You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:12:49 UTC
[32/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1904
#comment Add new KafkaInputOperator using kafka 0.9.x consumer API Features
includes: Out-of-box One-to-one and one-to-many partition scheme support plus
customizable partition s
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
new file mode 100644
index 0000000..e6256f1
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+import kafka.utils.VerifiableProperties;
+
+/**
+ * A simple partitioner class for test purpose
+ * Key is a int string
+ * Messages are distributed to all partitions
+ * One for even number, the other for odd
+ */
+public class KafkaTestPartitioner implements Partitioner
+{
+ public KafkaTestPartitioner(VerifiableProperties props) {
+
+ }
+
+ public KafkaTestPartitioner() {
+
+ }
+
+ @Override
+ public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
+ {
+ int num_partitions = cluster.partitionsForTopic(topic).size();
+ return Integer.parseInt((String)key)%num_partitions;
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map)
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
new file mode 100644
index 0000000..36130ce
--- /dev/null
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+/**
+ * A kafka producer for testing
+ */
+public class KafkaTestProducer implements Runnable
+{
+ // private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
+ private final Producer<String, String> producer;
+ private final Producer<String, String> producer1;
+ private final String topic;
+ private int sendCount = 20;
+ // to generate a random int as a key for partition
+ private final Random rand = new Random();
+ private boolean hasPartition = false;
+ private boolean hasMultiCluster = false;
+ private List<String> messages;
+
+ // http://kafka.apache.org/documentation.html#producerconfigs
+ private String ackType = "1";
+
+ public int getSendCount()
+ {
+ return sendCount;
+ }
+
+ public void setSendCount(int sendCount)
+ {
+ this.sendCount = sendCount;
+ }
+
+ public void setMessages(List<String> messages) {
+ this.messages = messages;
+ }
+
+ private Properties createProducerConfig(int cid)
+ {
+ Properties props = new Properties();
+ props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
+ String brokerList = "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
+ brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]):"";
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
+ props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
+
+ return props;
+ }
+
+ public KafkaTestProducer(String topic)
+ {
+ this(topic, false);
+ }
+
+ public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
+ {
+ // Use random partitioner. Don't need the key type. Just set it to Integer.
+ // The message is of type String.
+ this.topic = topic;
+ this.hasPartition = hasPartition;
+ this.hasMultiCluster = hasMultiCluster;
+ producer = new KafkaProducer<>(createProducerConfig(0));
+ if(hasMultiCluster){
+ producer1 = new KafkaProducer<>(createProducerConfig(1));
+ } else {
+ producer1 = null;
+ }
+ }
+
+ public KafkaTestProducer(String topic, boolean hasPartition) {
+ this(topic, hasPartition, false);
+ }
+
+ private void generateMessages()
+ {
+ // Create dummy message
+ int messageNo = 1;
+ while (messageNo <= sendCount) {
+ String messageStr = "Message_" + messageNo;
+ int k = rand.nextInt(100);
+ producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr));
+ if(hasMultiCluster){
+ messageNo++;
+ producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr));
+ }
+ messageNo++;
+ // logger.debug(String.format("Producing %s", messageStr));
+ }
+ // produce the end tuple to let the test input operator know it's done produce messages
+ producer.send(new ProducerRecord<>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+ if(hasMultiCluster) {
+ producer1.send(new ProducerRecord<>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+ }
+ if(hasPartition){
+ // Send end_tuple to other partition if it exist
+ producer.send(new ProducerRecord<>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+ if(hasMultiCluster) {
+ producer1.send(new ProducerRecord<>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ if (messages == null) {
+ generateMessages();
+ } else {
+ for (String msg : messages) {
+ Future f = producer.send(new ProducerRecord<>(topic, "", msg));
+ try {
+ f.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ producer.close();
+ if (producer1!=null) {
+ producer1.close();
+ }
+ }
+
+ public void close()
+ {
+ producer.close();
+ }
+
+ public String getAckType()
+ {
+ return ackType;
+ }
+
+ public void setAckType(String ackType)
+ {
+ this.ackType = ackType;
+ }
+} // End of KafkaTestProducer
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/7ee2c7e3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 874f30f..cea6658 100644
--- a/pom.xml
+++ b/pom.xml
@@ -173,6 +173,7 @@
<module>benchmark</module>
<module>apps</module>
<module>samples</module>
+ <module>kafka</module>
</modules>
</profile>
</profiles>