You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/01/06 08:20:55 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1956 Added POJO Kafka
Output operator with autometrics and batch processing
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 5efa71d99 -> ba8396f78
MLHR-1956 Added POJO Kafka Output operator with autometrics and batch processing
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/24fdbdfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/24fdbdfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/24fdbdfc
Branch: refs/heads/devel-3
Commit: 24fdbdfcfe37ef211563173952a676a73110dcd6
Parents: 6ee61fc
Author: Chaitanya <ch...@datatorrent.com>
Authored: Wed Jan 6 10:52:04 2016 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Wed Jan 6 10:52:04 2016 +0530
----------------------------------------------------------------------
.../kafka/AbstractKafkaOutputOperator.java | 19 ++
.../contrib/kafka/POJOKafkaOutputOperator.java | 261 +++++++++++++++++++
.../contrib/kafka/KafkaOutputOperatorTest.java | 67 +++++
3 files changed, 347 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index 77bf42b..f0835c4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -151,4 +151,23 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
public void endWindow()
{
}
+
+ /**
+ * Return the additional producer properties
+ * @return producerProperties
+ */
+ public String getProducerProperties()
+ {
+ return producerProperties;
+ }
+
+ /**
+ * Specify the additional producer properties in comma separated as string in the
+ * form of key1=value1,key2=value2,key3=value3,..
+ * @param producerProperties Given properties as string
+ */
+ public void setProducerProperties(String producerProperties)
+ {
+ this.producerProperties = producerProperties;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java
new file mode 100644
index 0000000..5c1f695
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.lang.reflect.Field;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.ClassUtils;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.lib.util.PojoUtils;
+
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+/**
+ * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
+ * from upstream and converts to Kafka Messages and writes to kafka topic.
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: Have only one input port<br>
+ * <b>Output</b>: No Output Port <br>
+ * <br>
+ * Properties:<br>
+ * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
+ * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
+ * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
+ * the value is true <br>
+ * <b>batchSize</b>: Specifies the batch size.<br>
+ * <br>
+ * <br>
+ * </p>
+ *
+ * @displayName POJO Kafka Output
+ * @category Messaging
+ * @tags Output operator
+ *
+ */
+public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
+{
+ @AutoMetric
+ private long outputMessagesPerSec;
+ @AutoMetric
+ private long outputBytesPerSec;
+ protected final String BROKER_KEY = "metadata.broker.list";
+ protected final String BATCH_NUM_KEY = "batch.num.messages";
+ protected final String PRODUCER_KEY = "producer.type";
+ protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
+ protected final String ASYNC_PRODUCER_TYPE = "async";
+ private long messageCount;
+ private long byteCount;
+ private String brokerList;
+ private double windowTimeSec;
+ private String keyField = "";
+ protected boolean isBatchProcessing = true;
+ @Min(2)
+ protected int batchSize;
+ protected transient PojoUtils.Getter keyMethod;
+ protected transient Class<?> pojoClass;
+
+ public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
+ @Override
+ public void setup(Context.PortContext context)
+ {
+ if (context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
+ pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
+ }
+ }
+
+ @Override
+ public void process(Object tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ /**
+ * setup producer configuration.
+ * @return ProducerConfig
+ */
+ @Override
+ protected ProducerConfig createKafkaProducerConfig()
+ {
+ if (brokerList != null) {
+ getConfigProperties().setProperty(BROKER_KEY, brokerList);
+ }
+ if (isBatchProcessing) {
+ if (batchSize != 0) {
+ getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
+ }
+ getConfigProperties().setProperty(PRODUCER_KEY, ASYNC_PRODUCER_TYPE);
+ }
+ return super.createKafkaProducerConfig();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ if (isBatchProcessing) {
+ getConfigProperties().setProperty(QUEUE_BUFFER_KEY, String.valueOf(context.getValue(
+ Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
+ }
+ super.setup(context);
+ windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
+ * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
+ if (pojoClass != null && keyField != "") {
+ try {
+ keyMethod = generateGetterForKeyField();
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Field " + keyField + " is invalid: " + e);
+ }
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ outputMessagesPerSec = 0;
+ outputBytesPerSec = 0;
+ messageCount = 0;
+ byteCount = 0;
+ }
+
+ /**
+ * Write the incoming tuple to Kafka
+ * @param tuple incoming tuple
+ */
+ protected void processTuple(Object tuple)
+ {
+ // Get the getter method from the keyField
+ if (keyMethod == null && keyField != "") {
+ pojoClass = tuple.getClass();
+ try {
+ keyMethod = generateGetterForKeyField();
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException("Field " + keyField + " is invalid: " + e);
+ }
+ }
+
+ // Convert the given tuple to KeyedMessage
+ KeyedMessage msg;
+ if (keyMethod != null) {
+ msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
+ } else {
+ msg = new KeyedMessage(getTopic(), tuple, tuple);
+ }
+
+ getProducer().send(msg);
+ messageCount++;
+ if (tuple instanceof byte[]) {
+ byteCount += ((byte[])tuple).length;
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ outputBytesPerSec = (long)(byteCount / windowTimeSec);
+ outputMessagesPerSec = (long)(messageCount / windowTimeSec);
+ }
+
+ private PojoUtils.Getter generateGetterForKeyField() throws NoSuchFieldException, SecurityException
+ {
+ Field f = pojoClass.getDeclaredField(keyField);
+ Class c = ClassUtils.primitiveToWrapper(f.getType());
+ PojoUtils.Getter classGetter = PojoUtils.createGetter(pojoClass, keyField, c);
+ return classGetter;
+ }
+
+ /**
+ * Returns the broker list of kafka clusters
+ * @return the broker list
+ */
+ public String getBrokerList()
+ {
+ return brokerList;
+ }
+
+ /**
+ * Sets the broker list with the given list
+ * @param brokerList
+ */
+ public void setBrokerList(@NotNull String brokerList)
+ {
+ this.brokerList = brokerList;
+ }
+
+ /**
+ * Specifies whether want to write in batch or not.
+ * @return isBatchProcessing
+ */
+ public boolean isBatchProcessing()
+ {
+ return isBatchProcessing;
+ }
+
+ /**
+ * Specifies whether want to write in batch or not.
+ * @param batchProcessing given batchProcessing
+ */
+ public void setBatchProcessing(boolean batchProcessing)
+ {
+ isBatchProcessing = batchProcessing;
+ }
+
+ /**
+ * Returns the batch size
+ * @return batch size
+ */
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ /**
+ * Sets the batch size
+ * @param batchSize batch size
+ */
+ public void setBatchSize(int batchSize)
+ {
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Returns the key field
+ * @return the key field
+ */
+ public String getKeyField()
+ {
+ return keyField;
+ }
+
+ /**
+ * Sets the key field which specifies the messages writes to Kafka based on this key.
+ * @param keyField the key field
+ */
+ public void setKeyField(String keyField)
+ {
+ this.keyField = keyField;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index a09fdc5..cc1f267 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -182,5 +182,72 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
listener.close();
}
+ /**
+ * Test AbstractKafkaOutputOperator (i.e. an output adapter for Kafka, aka producer).
+ * This module sends data into kafka message bus.
+ *
+ * [Generate tuple] ==> [send tuple through Kafka output adapter(i.e. producer) into Kafka message bus]
+ * ==> [receive data in outside Kaka listener (i.e consumer)]
+ *
+ * @throws Exception
+ */
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testPOJOKafkaOutputOperator() throws Exception
+ {
+ tupleCount = 0;
+ //initialize the latch to synchronize the threads
+ latch = new CountDownLatch(maxTuple);
+ // Setup a message listener to receive the message
+ KafkaTestConsumer listener = new KafkaTestConsumer("topic1");
+ listener.setLatch(latch);
+ new Thread(listener).start();
+
+ // Create DAG for testing.
+ LocalMode lma = LocalMode.newInstance();
+
+ StreamingApplication app = new StreamingApplication() {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ DAG dag = lma.getDAG();
+ StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
+ POJOKafkaOutputOperator node = dag.addOperator("KafkaMessageProducer", POJOKafkaOutputOperator.class);
+
+ Properties props = new Properties();
+ props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+ props.setProperty("producer.type", "async");
+ props.setProperty("queue.buffering.max.ms", "200");
+ props.setProperty("queue.buffering.max.messages", "10");
+
+ node.setConfigProperties(props);
+ node.setTopic("topic1");
+ node.setBrokerList("localhost:9092");
+ node.setBatchSize(5);
+
+ // Connect ports
+ dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.runAsync();
+
+ // Immediately return unless latch timeout in 20 seconds
+ latch.await(20, TimeUnit.SECONDS);
+ lc.shutdown();
+
+ // Check values send vs received
+ Assert.assertEquals("Number of emitted tuples", maxTuple, listener.holdingBuffer.size());
+ logger.debug(String.format("Number of emitted tuples: %d", listener.holdingBuffer.size()));
+ Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek()));
+
+ listener.close();
+ }
}
[2/2] incubator-apex-malhar git commit: Merge branch
'MLHR-1956-KafkaOutput-Batch' of
https://github.com/chaithu14/incubator-apex-malhar into devel-3
Posted by hs...@apache.org.
Merge branch 'MLHR-1956-KafkaOutput-Batch' of https://github.com/chaithu14/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ba8396f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ba8396f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ba8396f7
Branch: refs/heads/devel-3
Commit: ba8396f78fba72d5cf294a789661fd508d539062
Parents: 5efa71d 24fdbdf
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Jan 5 23:19:51 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Jan 5 23:19:51 2016 -0800
----------------------------------------------------------------------
.../kafka/AbstractKafkaOutputOperator.java | 19 ++
.../contrib/kafka/POJOKafkaOutputOperator.java | 261 +++++++++++++++++++
.../contrib/kafka/KafkaOutputOperatorTest.java | 67 +++++
3 files changed, 347 insertions(+)
----------------------------------------------------------------------