You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/21 07:36:59 UTC

apex-malhar git commit: APEXMALHAR-2412 Provide emitTuple overriding functionality for user in kinesis Input operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 6c42103f8 -> c5af27b1e


APEXMALHAR-2412 Provide emitTuple overriding functionality for user in kinesis Input operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5af27b1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5af27b1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5af27b1

Branch: refs/heads/master
Commit: c5af27b1ee5dc430a42c01c112e0f6bd84a5ac07
Parents: 6c42103
Author: deepak-narkhede <ma...@gmail.com>
Authored: Mon Feb 20 16:52:54 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Mon Feb 20 16:53:36 2017 +0530

----------------------------------------------------------------------
 .../contrib/kinesis/AbstractKinesisInputOperator.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5af27b1/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 18a6399..30ceadb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -158,6 +158,14 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
    */
   public abstract T getTuple(Record rc);
 
+  /**
+   * Any concrete class derived from AbstractKinesisInputOperator may implement this method to emit tuples to an output port.
+   */
+  public void emitTuple(Pair<String, Record> data)
+  {
+    outputPort.emit(getTuple(data.getSecond()));
+  }
+
   @Override
   public void partitioned(Map<Integer, Partition<AbstractKinesisInputOperator>> partitions)
   {
@@ -465,7 +473,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
           List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(),
               rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst());
           for (Record record : records) {
-            outputPort.emit(getTuple(record));
+            emitTuple(new Pair<String, Record>(rc.getKey(), record));
             shardPosition.put(rc.getKey(), record.getSequenceNumber());
           }
         } catch(Exception e)
@@ -569,8 +577,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
       Pair<String, Record> data = consumer.pollRecord();
       String shardId = data.getFirst();
       String recordId = data.getSecond().getSequenceNumber();
-      T tuple = getTuple(data.getSecond());
-      outputPort.emit(tuple);
+      emitTuple(data);
       if(!currentWindowRecoveryState.containsKey(shardId))
       {
         currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));