You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/06 16:36:34 UTC
[flink] branch master updated (099106d -> 7b46f60)
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 099106d [FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent
new 33ec4f2 [FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging
new 7b46f60 [hotfix] [docs] Fix typo in "Window Operator" documentation
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
docs/dev/stream/operators/windows.md | 2 +-
docs/dev/stream/operators/windows.zh.md | 2 +-
.../connectors/kafka/FlinkKafkaConsumerBase.java | 17 +++++++++++------
3 files changed, 13 insertions(+), 8 deletions(-)
[flink] 01/02: [FLINK-12368] [Kafka] Add subtask index to
FlinkKafkaConsumerBase logging
Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33ec4f2f97cc726860859ba3503a35a6997f1854
Author: Steven Wu <st...@netflix.com>
AuthorDate: Wed Feb 27 22:19:02 2019 -0800
[FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging
This closes #8315
---
.../connectors/kafka/FlinkKafkaConsumerBase.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 6cf1839..d49e3fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -662,6 +662,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// initialize commit metrics and default offset callback method
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+ final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
@@ -671,7 +672,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void onException(Throwable cause) {
- LOG.warn("Async Kafka commit failed.", cause);
+ LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
failedCommits.inc();
}
};
@@ -683,6 +684,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
sourceContext.markAsTemporarilyIdle();
}
+ LOG.info("Consumer subtask {} creating fetcher with offsets {}.",
+ getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
@@ -877,9 +880,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
- LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
+ LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
} else {
- LOG.info("No restore state for FlinkKafkaConsumer.");
+ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
}
}
@@ -943,13 +946,15 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// only one commit operation must be in progress
if (LOG.isDebugEnabled()) {
- LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
+ LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+ getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
}
try {
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
- LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
return;
}
@@ -963,7 +968,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
if (offsets == null || offsets.size() == 0) {
- LOG.debug("Checkpoint state was empty.");
+ LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
return;
}
[flink] 02/02: [hotfix] [docs] Fix typo in "Window Operator"
documentation
Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7b46f6046a909e20f1bd9083c5913a394cc578b0
Author: Humberto RodrÃguez A <rh...@users.noreply.github.com>
AuthorDate: Fri Apr 26 16:31:51 2019 +0200
[hotfix] [docs] Fix typo in "Window Operator" documentation
Before: ... windows together if `their` are closer to each
After: ... windows together if `they` are closer to each
This closes #8285
---
docs/dev/stream/operators/windows.md | 2 +-
docs/dev/stream/operators/windows.zh.md | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 5861463..d317b2e 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -371,7 +371,7 @@ Dynamic gaps are specified by implementing the `SessionWindowTimeGapExtractor` i
<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
they are evaluated differently than tumbling and sliding windows. Internally, a session window operator
-creates a new window for each arriving record and merges windows together if their are closer to each other
+creates a new window for each arriving record and merges windows together if they are closer to each other
than the defined gap.
In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
[Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction`
diff --git a/docs/dev/stream/operators/windows.zh.md b/docs/dev/stream/operators/windows.zh.md
index 2f32d8a..45ef36f 100644
--- a/docs/dev/stream/operators/windows.zh.md
+++ b/docs/dev/stream/operators/windows.zh.md
@@ -371,7 +371,7 @@ Dynamic gaps are specified by implementing the `SessionWindowTimeGapExtractor` i
<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
they are evaluated differently than tumbling and sliding windows. Internally, a session window operator
-creates a new window for each arriving record and merges windows together if their are closer to each other
+creates a new window for each arriving record and merges windows together if they are closer to each other
than the defined gap.
In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
[Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction`