You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2018/09/20 09:26:10 UTC
[flink] branch master updated: [FLINK-10358] fix NPE when running
flink-kinesis connector against dynamodb streams
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 26bac51 [FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb streams
26bac51 is described below
commit 26bac51cae1d298078902a02e196fffc16ea5704
Author: Ying Xu <yx...@lyft.com>
AuthorDate: Thu Sep 20 02:26:02 2018 -0700
[FLINK-10358] fix NPE when running flink-kinesis connector against dynamodb streams
This closes #6708.
---
.../flink/streaming/connectors/kinesis/internals/ShardConsumer.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 5845eea..36a4e92 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -373,7 +373,10 @@ public class ShardConsumer<T> implements Runnable {
getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
// Update millis behind latest so it gets reported by the millisBehindLatest gauge
- shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
+ Long millisBehindLatest = getRecordsResult.getMillisBehindLatest();
+ if (millisBehindLatest != null) {
+ shardMetricsReporter.setMillisBehindLatest(millisBehindLatest);
+ }
} catch (ExpiredIteratorException eiEx) {
LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
" refreshing the iterator ...", shardItr, subscribedShard);