You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/01/06 08:20:55 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1956 Added POJO Kafka Output operator with autometrics and batch processing

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 5efa71d99 -> ba8396f78


MLHR-1956 Added POJO Kafka Output operator with autometrics and batch processing


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/24fdbdfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/24fdbdfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/24fdbdfc

Branch: refs/heads/devel-3
Commit: 24fdbdfcfe37ef211563173952a676a73110dcd6
Parents: 6ee61fc
Author: Chaitanya <ch...@datatorrent.com>
Authored: Wed Jan 6 10:52:04 2016 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Wed Jan 6 10:52:04 2016 +0530

----------------------------------------------------------------------
 .../kafka/AbstractKafkaOutputOperator.java      |  19 ++
 .../contrib/kafka/POJOKafkaOutputOperator.java  | 261 +++++++++++++++++++
 .../contrib/kafka/KafkaOutputOperatorTest.java  |  67 +++++
 3 files changed, 347 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index 77bf42b..f0835c4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -151,4 +151,23 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
   public void endWindow()
   {
   }
+
+  /**
+   * Return the additional producer properties
+   * @return producerProperties
+   */
+  public String getProducerProperties()
+  {
+    return producerProperties;
+  }
+
+  /**
+   * Specify the additional producer properties in comma separated as string in the
+   * form of key1=value1,key2=value2,key3=value3,..
+   * @param producerProperties Given properties as string
+   */
+  public void setProducerProperties(String producerProperties)
+  {
+    this.producerProperties = producerProperties;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java
new file mode 100644
index 0000000..5c1f695
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.lang.reflect.Field;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.ClassUtils;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.lib.util.PojoUtils;
+
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+/**
+ * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
+ * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
+ * <p>
+ * <br>
+ * Ports:<br>
+ * <b>Input</b>: Have only one input port<br>
+ * <b>Output</b>: No Output Port <br>
+ * <br>
+ * Properties:<br>
+ * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
+ * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
+ * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
+ *                           the value is true <br>
+ * <b>batchSize</b>: Specifies the batch size.<br>
+ * <br>
+ * <br>
+ * </p>
+ *
+ * @displayName POJO Kafka Output
+ * @category Messaging
+ * @tags Output operator
+ *
+ */
+public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
+{
+  @AutoMetric
+  private long outputMessagesPerSec;
+  @AutoMetric
+  private long outputBytesPerSec;
+  protected final String BROKER_KEY = "metadata.broker.list";
+  protected final String BATCH_NUM_KEY = "batch.num.messages";
+  protected final String PRODUCER_KEY = "producer.type";
+  protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
+  protected final String ASYNC_PRODUCER_TYPE = "async";
+  private long messageCount;
+  private long byteCount;
+  private String brokerList;
+  private double windowTimeSec;
+  private String keyField = "";
+  protected boolean isBatchProcessing = true;
+  @Min(2)
+  protected int batchSize;
+  protected transient PojoUtils.Getter keyMethod;
+  protected transient Class<?> pojoClass;
+
+  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
+    @Override
+    public void setup(Context.PortContext context)
+    {
+      if (context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
+        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
+      }
+    }
+
+    @Override
+    public void process(Object tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  /**
+   * setup producer configuration.
+   * @return ProducerConfig
+   */
+  @Override
+  protected ProducerConfig createKafkaProducerConfig()
+  {
+    if (brokerList != null) {
+      getConfigProperties().setProperty(BROKER_KEY, brokerList);
+    }
+    if (isBatchProcessing) {
+      if (batchSize != 0) {
+        getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
+      }
+      getConfigProperties().setProperty(PRODUCER_KEY, ASYNC_PRODUCER_TYPE);
+    }
+    return super.createKafkaProducerConfig();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (isBatchProcessing) {
+      getConfigProperties().setProperty(QUEUE_BUFFER_KEY, String.valueOf(context.getValue(
+          Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
+    }
+    super.setup(context);
+    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
+      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
+    if (pojoClass != null && keyField != "") {
+      try {
+        keyMethod = generateGetterForKeyField();
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
+      }
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    super.beginWindow(windowId);
+    outputMessagesPerSec = 0;
+    outputBytesPerSec = 0;
+    messageCount = 0;
+    byteCount = 0;
+  }
+
+  /**
+   * Write the incoming tuple to Kafka
+   * @param tuple incoming tuple
+   */
+  protected void processTuple(Object tuple)
+  {
+    // Get the getter method from the keyField
+    if (keyMethod == null && keyField != "") {
+      pojoClass = tuple.getClass();
+      try {
+        keyMethod = generateGetterForKeyField();
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
+      }
+    }
+
+    // Convert the given tuple to KeyedMessage
+    KeyedMessage msg;
+    if (keyMethod != null) {
+      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
+    } else {
+      msg = new KeyedMessage(getTopic(), tuple, tuple);
+    }
+
+    getProducer().send(msg);
+    messageCount++;
+    if (tuple instanceof byte[]) {
+      byteCount += ((byte[])tuple).length;
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    super.endWindow();
+    outputBytesPerSec = (long)(byteCount / windowTimeSec);
+    outputMessagesPerSec = (long)(messageCount / windowTimeSec);
+  }
+
+  private PojoUtils.Getter generateGetterForKeyField() throws NoSuchFieldException, SecurityException
+  {
+    Field f = pojoClass.getDeclaredField(keyField);
+    Class c = ClassUtils.primitiveToWrapper(f.getType());
+    PojoUtils.Getter classGetter = PojoUtils.createGetter(pojoClass, keyField, c);
+    return classGetter;
+  }
+
+  /**
+   * Returns the broker list of kafka clusters
+   * @return the broker list
+   */
+  public String getBrokerList()
+  {
+    return brokerList;
+  }
+
+  /**
+   * Sets the broker list with the given list
+   * @param brokerList
+   */
+  public void setBrokerList(@NotNull String brokerList)
+  {
+    this.brokerList = brokerList;
+  }
+
+  /**
+   * Specifies whether want to write in batch or not.
+   * @return isBatchProcessing
+   */
+  public boolean isBatchProcessing()
+  {
+    return isBatchProcessing;
+  }
+
+  /**
+   * Specifies whether want to write in batch or not.
+   * @param batchProcessing given batchProcessing
+   */
+  public void setBatchProcessing(boolean batchProcessing)
+  {
+    isBatchProcessing = batchProcessing;
+  }
+
+  /**
+   * Returns the batch size
+   * @return batch size
+   */
+  public int getBatchSize()
+  {
+    return batchSize;
+  }
+
+  /**
+   * Sets the batch size
+   * @param batchSize batch size
+   */
+  public void setBatchSize(int batchSize)
+  {
+    this.batchSize = batchSize;
+  }
+
+  /**
+   * Returns the key field
+   * @return the key field
+   */
+  public String getKeyField()
+  {
+    return keyField;
+  }
+
+  /**
+   * Sets the key field which specifies the messages writes to Kafka based on this key.
+   * @param keyField the key field
+   */
+  public void setKeyField(String keyField)
+  {
+    this.keyField = keyField;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/24fdbdfc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index a09fdc5..cc1f267 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -182,5 +182,72 @@ public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
     listener.close();
   }
 
+  /**
+   * Test AbstractKafkaOutputOperator (i.e. an output adapter for Kafka, aka producer).
+   * This module sends data into kafka message bus.
+   *
+   * [Generate tuple] ==> [send tuple through Kafka output adapter(i.e. producer) into Kafka message bus]
+   * ==> [receive data in outside Kaka listener (i.e consumer)]
+   *
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testPOJOKafkaOutputOperator() throws Exception
+  {
+    tupleCount = 0;
+    //initialize the latch to synchronize the threads
+    latch = new CountDownLatch(maxTuple);
+    // Setup a message listener to receive the message
+    KafkaTestConsumer listener = new KafkaTestConsumer("topic1");
+    listener.setLatch(latch);
+    new Thread(listener).start();
+
+    // Create DAG for testing.
+    LocalMode lma = LocalMode.newInstance();
+
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    DAG dag = lma.getDAG();
 
+    StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class);
+    POJOKafkaOutputOperator node = dag.addOperator("KafkaMessageProducer", POJOKafkaOutputOperator.class);
+
+    Properties props = new Properties();
+    props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+    props.setProperty("producer.type", "async");
+    props.setProperty("queue.buffering.max.ms", "200");
+    props.setProperty("queue.buffering.max.messages", "10");
+
+    node.setConfigProperties(props);
+    node.setTopic("topic1");
+    node.setBrokerList("localhost:9092");
+    node.setBatchSize(5);
+
+    // Connect ports
+    dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
+
+    Configuration conf = new Configuration(false);
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.runAsync();
+
+    // Immediately return unless latch timeout in 20 seconds
+    latch.await(20, TimeUnit.SECONDS);
+    lc.shutdown();
+
+    // Check values send vs received
+    Assert.assertEquals("Number of emitted tuples", maxTuple, listener.holdingBuffer.size());
+    logger.debug(String.format("Number of emitted tuples: %d", listener.holdingBuffer.size()));
+    Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek()));
+
+    listener.close();
+  }
 }


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1956-KafkaOutput-Batch' of https://github.com/chaithu14/incubator-apex-malhar into devel-3

Posted by hs...@apache.org.
Merge branch 'MLHR-1956-KafkaOutput-Batch' of https://github.com/chaithu14/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ba8396f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ba8396f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ba8396f7

Branch: refs/heads/devel-3
Commit: ba8396f78fba72d5cf294a789661fd508d539062
Parents: 5efa71d 24fdbdf
Author: Siyuan Hua <hs...@apache.org>
Authored: Tue Jan 5 23:19:51 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Tue Jan 5 23:19:51 2016 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaOutputOperator.java      |  19 ++
 .../contrib/kafka/POJOKafkaOutputOperator.java  | 261 +++++++++++++++++++
 .../contrib/kafka/KafkaOutputOperatorTest.java  |  67 +++++
 3 files changed, 347 insertions(+)
----------------------------------------------------------------------