You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2018/09/20 09:26:10 UTC

[flink] branch master updated: [FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb streams

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 26bac51  [FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb streams
26bac51 is described below

commit 26bac51cae1d298078902a02e196fffc16ea5704
Author: Ying Xu <yx...@lyft.com>
AuthorDate: Thu Sep 20 02:26:02 2018 -0700

    [FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb streams
    
    This closes #6708.
---
 .../flink/streaming/connectors/kinesis/internals/ShardConsumer.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 5845eea..36a4e92 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -373,7 +373,10 @@ public class ShardConsumer<T> implements Runnable {
 				getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
 
 				// Update millis behind latest so it gets reported by the millisBehindLatest gauge
-				shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
+				Long millisBehindLatest = getRecordsResult.getMillisBehindLatest();
+				if (millisBehindLatest != null) {
+					shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
+				}
 			} catch (ExpiredIteratorException eiEx) {
 				LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
 					" refreshing the iterator ...", shardItr, subscribedShard);