You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/02/21 07:36:59 UTC
apex-malhar git commit: APEXMALHAR-2412 Provide emitTuple overriding
functionality for user in kinesis Input operator
Repository: apex-malhar
Updated Branches:
refs/heads/master 6c42103f8 -> c5af27b1e
APEXMALHAR-2412 Provide emitTuple overriding functionality for user in kinesis Input operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c5af27b1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c5af27b1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c5af27b1
Branch: refs/heads/master
Commit: c5af27b1ee5dc430a42c01c112e0f6bd84a5ac07
Parents: 6c42103
Author: deepak-narkhede <ma...@gmail.com>
Authored: Mon Feb 20 16:52:54 2017 +0530
Committer: deepak-narkhede <ma...@gmail.com>
Committed: Mon Feb 20 16:53:36 2017 +0530
----------------------------------------------------------------------
.../contrib/kinesis/AbstractKinesisInputOperator.java | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c5af27b1/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 18a6399..30ceadb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -158,6 +158,14 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
*/
public abstract T getTuple(Record rc);
+ /**
+ * Any concrete class derived from AbstractKinesisInputOperator may implement this method to emit tuples to an output port.
+ */
+ public void emitTuple(Pair<String, Record> data)
+ {
+ outputPort.emit(getTuple(data.getSecond()));
+ }
+
@Override
public void partitioned(Map<Integer, Partition<AbstractKinesisInputOperator>> partitions)
{
@@ -465,7 +473,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
List<Record> records = KinesisUtil.getInstance().getRecords(consumer.streamName, rc.getValue().getSecond(),
rc.getKey(), ShardIteratorType.AT_SEQUENCE_NUMBER, rc.getValue().getFirst());
for (Record record : records) {
- outputPort.emit(getTuple(record));
+ emitTuple(new Pair<String, Record>(rc.getKey(), record));
shardPosition.put(rc.getKey(), record.getSequenceNumber());
}
} catch(Exception e)
@@ -569,8 +577,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
Pair<String, Record> data = consumer.pollRecord();
String shardId = data.getFirst();
String recordId = data.getSecond().getSequenceNumber();
- T tuple = getTuple(data.getSecond());
- outputPort.emit(tuple);
+ emitTuple(data);
if(!currentWindowRecoveryState.containsKey(shardId))
{
currentWindowRecoveryState.put(shardId, new KinesisPair<String, Integer>(recordId, 1));