You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by chaithu14 <gi...@git.apache.org> on 2015/12/28 12:44:00 UTC

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

GitHub user chaithu14 opened a pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148

    MLHR-1956 Added POJO Kafka Output operator with auto-metrics and batchprocessing

    @siyuanh : Please review.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/chaithu14/incubator-apex-malhar MLHR-1956-KafkaOutput-Batch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-malhar/pull/148.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #148
    
----
commit aadb61ac8b40fb93aa94c471895eb78351da2af7
Author: Chaitanya <ch...@datatorrent.com>
Date:   2015-12-28T11:24:49Z

    MLHR-1956 Added POJO Kafka Output operator with autometrics and batch processing

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48557887
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    --- End diff --
    
    Sorry, I was wrong, ignore this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48875794
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    --- End diff --
    
    We don't need extra static value here, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-malhar/pull/148


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48799042
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    +    producerConfig = super.createKafkaProducerConfig();
    +    return producerConfig;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
    +      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
    +    if(pojoClass != null && keyField != "") {
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +    batchSize = producerConfig.batchNumMessages();
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    outputMessagesPerSec = 0;
    +    outputBytesPerSec = 0;
    +    messageCount = 0;
    +    byteCount = 0;
    +  }
    +
    +  /**
    +   * Write the incoming tuple to Kafka
    +   * @param tuple incoming tuple
    +   */
    +  protected void processTuple(Object tuple)
    +  {
    +    // Get the getter method from the keyField
    +    if(keyMethod == null && keyField != "") {
    +      pojoClass = tuple.getClass();
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +
    +    // Convert the given tuple to KeyedMessage
    +    KeyedMessage msg;
    +    if(keyMethod != null) {
    +      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
    +    } else {
    +      msg = new KeyedMessage(getTopic(), tuple, tuple);
    +    }
    +
    +    if (isBatchProcessing) {
    +      if (messageList.size() == batchSize) {
    --- End diff --
    
    @chaithu14  What is the purpose of this batch producer? If you want to enforce the batch processing, you have to set the batch property anyways, and if you set it, you can't make sure the message will be send out at the end of window.   You can't guarantee both. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48557321
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    +    producerConfig = super.createKafkaProducerConfig();
    +    return producerConfig;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
    +      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
    +    if(pojoClass != null && keyField != "") {
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +    batchSize = producerConfig.batchNumMessages();
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    outputMessagesPerSec = 0;
    +    outputBytesPerSec = 0;
    +    messageCount = 0;
    +    byteCount = 0;
    +  }
    +
    +  /**
    +   * Write the incoming tuple to Kafka
    +   * @param tuple incoming tuple
    +   */
    +  protected void processTuple(Object tuple)
    +  {
    +    // Get the getter method from the keyField
    +    if(keyMethod == null && keyField != "") {
    +      pojoClass = tuple.getClass();
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +
    +    // Convert the given tuple to KeyedMessage
    +    KeyedMessage msg;
    +    if(keyMethod != null) {
    +      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
    +    } else {
    +      msg = new KeyedMessage(getTopic(), tuple, tuple);
    +    }
    +
    +    if (isBatchProcessing) {
    +      if (messageList.size() == batchSize) {
    --- End diff --
    
    I think you don't have to do batch manually. you can set the  batch.num.messages to batch size and also set the queue.buffering.max.ms to streaming window size. It's not strictly same as what you are doing here. But the logic and performance are similar 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48601469
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    --- End diff --
    
    Yes. If it is a batch mode, added the producer.type as async to config properties.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48929077
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    --- End diff --
    
    Yes. DEFAULT_BATCH_SIZE static value is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48817232
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    +    producerConfig = super.createKafkaProducerConfig();
    +    return producerConfig;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
    +      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
    +    if(pojoClass != null && keyField != "") {
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +    batchSize = producerConfig.batchNumMessages();
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    outputMessagesPerSec = 0;
    +    outputBytesPerSec = 0;
    +    messageCount = 0;
    +    byteCount = 0;
    +  }
    +
    +  /**
    +   * Write the incoming tuple to Kafka
    +   * @param tuple incoming tuple
    +   */
    +  protected void processTuple(Object tuple)
    +  {
    +    // Get the getter method from the keyField
    +    if(keyMethod == null && keyField != "") {
    +      pojoClass = tuple.getClass();
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +
    +    // Convert the given tuple to KeyedMessage
    +    KeyedMessage msg;
    +    if(keyMethod != null) {
    +      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
    +    } else {
    +      msg = new KeyedMessage(getTopic(), tuple, tuple);
    +    }
    +
    +    if (isBatchProcessing) {
    +      if (messageList.size() == batchSize) {
    --- End diff --
    
    Purpose is to process batch of messages instead of message by message. But, producer is already taking care this. Will update it as per the previous review comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48929079
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  protected final String PRODUCER_KEY = "producer.type";
    +  protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
    +  protected final String ASYNC_PRODUCER_TYPE = "async";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    if(isBatchProcessing) {
    +      getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48875491
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  protected final String PRODUCER_KEY = "producer.type";
    +  protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
    +  protected final String ASYNC_PRODUCER_TYPE = "async";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    --- End diff --
    
    { on new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48875875
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  protected final String PRODUCER_KEY = "producer.type";
    +  protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
    +  protected final String ASYNC_PRODUCER_TYPE = "async";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    if(isBatchProcessing) {
    +      getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    --- End diff --
    
    Also the producer type should be async


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48557083
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    --- End diff --
    
    Enforce this only if it is in batch mode, right?  And also you should set producer.type as true to make the batch mode really work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48601581
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    +    producerConfig = super.createKafkaProducerConfig();
    +    return producerConfig;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
    +      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
    +    if(pojoClass != null && keyField != "") {
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +    batchSize = producerConfig.batchNumMessages();
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    outputMessagesPerSec = 0;
    +    outputBytesPerSec = 0;
    +    messageCount = 0;
    +    byteCount = 0;
    +  }
    +
    +  /**
    +   * Write the incoming tuple to Kafka
    +   * @param tuple incoming tuple
    +   */
    +  protected void processTuple(Object tuple)
    +  {
    +    // Get the getter method from the keyField
    +    if(keyMethod == null && keyField != "") {
    +      pojoClass = tuple.getClass();
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +
    +    // Convert the given tuple to KeyedMessage
    +    KeyedMessage msg;
    +    if(keyMethod != null) {
    +      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
    +    } else {
    +      msg = new KeyedMessage(getTopic(), tuple, tuple);
    +    }
    +
    +    if (isBatchProcessing) {
    +      if (messageList.size() == batchSize) {
    --- End diff --
    
    Not changing this because want to send the remaining batch messages to Kafka at end window. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48822067
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    +    producerConfig = super.createKafkaProducerConfig();
    +    return producerConfig;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
    +      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
    +    if(pojoClass != null && keyField != "") {
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +    batchSize = producerConfig.batchNumMessages();
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    outputMessagesPerSec = 0;
    +    outputBytesPerSec = 0;
    +    messageCount = 0;
    +    byteCount = 0;
    +  }
    +
    +  /**
    +   * Write the incoming tuple to Kafka
    +   * @param tuple incoming tuple
    +   */
    +  protected void processTuple(Object tuple)
    +  {
    +    // Get the getter method from the keyField
    +    if(keyMethod == null && keyField != "") {
    +      pojoClass = tuple.getClass();
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +
    +    // Convert the given tuple to KeyedMessage
    +    KeyedMessage msg;
    +    if(keyMethod != null) {
    +      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
    +    } else {
    +      msg = new KeyedMessage(getTopic(), tuple, tuple);
    +    }
    +
    +    if (isBatchProcessing) {
    +      if (messageList.size() == batchSize) {
    --- End diff --
    
    Incorporated the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48557730
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    --- End diff --
    
    If I remember correctly AutoMetric only works when you have setter/getter method for this. @chandnisingh  am I correct?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48875436
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  protected final String PRODUCER_KEY = "producer.type";
    +  protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
    +  protected final String ASYNC_PRODUCER_TYPE = "async";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    --- End diff --
    
    Space after "if". Please follow the checkstyle rules


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48875311
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,266 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    --- End diff --
    
    Remove unused imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-malhar pull request: MLHR-1956 Added POJO Kafka Out...

Posted by siyuanh <gi...@git.apache.org>.
Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/148#discussion_r48557364
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/POJOKafkaOutputOperator.java ---
    @@ -0,0 +1,272 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.kafka;
    +
    +import java.lang.reflect.Field;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +
    +import javax.validation.constraints.Min;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.commons.lang3.StringUtils;
    +
    +import com.datatorrent.api.AutoMetric;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DefaultInputPort;
    +import com.datatorrent.lib.util.PojoUtils;
    +
    +import kafka.producer.KeyedMessage;
    +import kafka.producer.ProducerConfig;
    +
    +/**
    + * POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
    + * from upstream and converts to Kafka Messages and writes to kafka topic.&nbsp;
    + * <p>
    + * <br>
    + * Ports:<br>
    + * <b>Input</b>: Have only one input port<br>
    + * <b>Output</b>: No Output Port <br>
    + * <br>
    + * Properties:<br>
    + * <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
    + * <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
    + * <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
    + *                           the value is true <br>
    + * <b>batchSize</b>: Specifies the batch size.<br>
    + * <br>
    + * <br>
    + * </p>
    + *
    + * @displayName POJO Kafka Output
    + * @category Messaging
    + * @tags Output operator
    + *
    + */
    +public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
    +{
    +  @AutoMetric
    +  private long outputMessagesPerSec;
    +  @AutoMetric
    +  private long outputBytesPerSec;
    +  protected final Integer DEFAULT_BATCH_SIZE = 200;
    +  protected final String BROKER_KEY = "metadata.broker.list";
    +  protected final String BATCH_NUM_KEY = "batch.num.messages";
    +  private long messageCount;
    +  private long byteCount;
    +  private String brokerList;
    +  private double windowTimeSec;
    +  private String keyField = "";
    +  protected boolean isBatchProcessing = true;
    +  @Min(2)
    +  protected int batchSize = DEFAULT_BATCH_SIZE;
    +  protected transient List<KeyedMessage<Object,Object>> messageList = new ArrayList<>();
    +  protected transient PojoUtils.Getter keyMethod;
    +  protected transient Class<?> pojoClass;
    +  protected transient ProducerConfig producerConfig;
    +
    +  public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
    +    @Override
    +    public void setup(Context.PortContext context)
    +    {
    +      if(context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
    +        pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
    +      }
    +    }
    +
    +    @Override
    +    public void process(Object tuple)
    +    {
    +      processTuple(tuple);
    +    }
    +  };
    +
    +  /**
    +   * setup producer configuration.
    +   * @return ProducerConfig
    +   */
    +  @Override
    +  protected ProducerConfig createKafkaProducerConfig(){
    +    if(brokerList != null) {
    +      getConfigProperties().setProperty(BROKER_KEY, brokerList);
    +    }
    +    getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
    +    producerConfig = super.createKafkaProducerConfig();
    +    return producerConfig;
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    super.setup(context);
    +    windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
    +      * context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
    +    if(pojoClass != null && keyField != "") {
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +    batchSize = producerConfig.batchNumMessages();
    +  }
    +
    +  @Override
    +  public void beginWindow(long windowId)
    +  {
    +    super.beginWindow(windowId);
    +    outputMessagesPerSec = 0;
    +    outputBytesPerSec = 0;
    +    messageCount = 0;
    +    byteCount = 0;
    +  }
    +
    +  /**
    +   * Write the incoming tuple to Kafka
    +   * @param tuple incoming tuple
    +   */
    +  protected void processTuple(Object tuple)
    +  {
    +    // Get the getter method from the keyField
    +    if(keyMethod == null && keyField != "") {
    +      pojoClass = tuple.getClass();
    +      try {
    +        keyMethod = generateGetterForKeyField();
    +      } catch (NoSuchFieldException e) {
    +        throw new RuntimeException("Field " + keyField + " is invalid: " + e);
    +      }
    +    }
    +
    +    // Convert the given tuple to KeyedMessage
    +    KeyedMessage msg;
    +    if(keyMethod != null) {
    +      msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
    +    } else {
    +      msg = new KeyedMessage(getTopic(), tuple, tuple);
    +    }
    +
    +    if (isBatchProcessing) {
    +      if (messageList.size() == batchSize) {
    --- End diff --
    
    Unless you want to clear the buffer exactly at the endwindow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---