You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/06 16:36:34 UTC

[flink] branch master updated (099106d -> 7b46f60)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 099106d  [FLINK-12366][table] Clean up catalog APIs to make them more consistent and coherent
     new 33ec4f2  [FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging
     new 7b46f60  [hotfix] [docs] Fix typo in "Window Operator" documentation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/operators/windows.md                    |  2 +-
 docs/dev/stream/operators/windows.zh.md                 |  2 +-
 .../connectors/kafka/FlinkKafkaConsumerBase.java        | 17 +++++++++++------
 3 files changed, 13 insertions(+), 8 deletions(-)


[flink] 01/02: [FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33ec4f2f97cc726860859ba3503a35a6997f1854
Author: Steven Wu <st...@netflix.com>
AuthorDate: Wed Feb 27 22:19:02 2019 -0800

    [FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging
    
    This closes #8315
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java        | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 6cf1839..d49e3fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -662,6 +662,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		// initialize commit metrics and default offset callback method
 		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
 		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+		final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
 
 		this.offsetCommitCallback = new KafkaCommitCallback() {
 			@Override
@@ -671,7 +672,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 			@Override
 			public void onException(Throwable cause) {
-				LOG.warn("Async Kafka commit failed.", cause);
+				LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
 				failedCommits.inc();
 			}
 		};
@@ -683,6 +684,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			sourceContext.markAsTemporarilyIdle();
 		}
 
+		LOG.info("Consumer subtask {} creating fetcher with offsets {}.",
+			getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);
 		// from this point forward:
 		//   - 'snapshotState' will draw offsets from the fetcher,
 		//     instead of being built from `subscribedPartitionsToStartOffsets`
@@ -877,9 +880,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
 			}
 
-			LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
+			LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
 		} else {
-			LOG.info("No restore state for FlinkKafkaConsumer.");
+			LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
 		}
 	}
 
@@ -943,13 +946,15 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
 			// only one commit operation must be in progress
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
+				LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+					getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
 			}
 
 			try {
 				final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
 				if (posInMap == -1) {
-					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+					LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
+						getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
 					return;
 				}
 
@@ -963,7 +968,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				}
 
 				if (offsets == null || offsets.size() == 0) {
-					LOG.debug("Checkpoint state was empty.");
+					LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
 					return;
 				}
 


[flink] 02/02: [hotfix] [docs] Fix typo in "Window Operator" documentation

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b46f6046a909e20f1bd9083c5913a394cc578b0
Author: Humberto Rodríguez A <rh...@users.noreply.github.com>
AuthorDate: Fri Apr 26 16:31:51 2019 +0200

    [hotfix] [docs] Fix typo in "Window Operator" documentation
    
    Before: ... windows together if `their` are closer to each
    After: ... windows together if `they` are closer to each
    
    This closes #8285
---
 docs/dev/stream/operators/windows.md    | 2 +-
 docs/dev/stream/operators/windows.zh.md | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 5861463..d317b2e 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -371,7 +371,7 @@ Dynamic gaps are specified by implementing the `SessionWindowTimeGapExtractor` i
 
 <span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
 they are  evaluated differently than tumbling and sliding windows. Internally, a session window operator
-creates a new window for each arriving record and merges windows together if their are closer to each other
+creates a new window for each arriving record and merges windows together if they are closer to each other
 than the defined gap.
 In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
 [Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction`
diff --git a/docs/dev/stream/operators/windows.zh.md b/docs/dev/stream/operators/windows.zh.md
index 2f32d8a..45ef36f 100644
--- a/docs/dev/stream/operators/windows.zh.md
+++ b/docs/dev/stream/operators/windows.zh.md
@@ -371,7 +371,7 @@ Dynamic gaps are specified by implementing the `SessionWindowTimeGapExtractor` i
 
 <span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
 they are  evaluated differently than tumbling and sliding windows. Internally, a session window operator
-creates a new window for each arriving record and merges windows together if their are closer to each other
+creates a new window for each arriving record and merges windows together if they are closer to each other
 than the defined gap.
 In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
 [Window Function](#window-functions), such as `ReduceFunction`, `AggregateFunction`, or `ProcessWindowFunction`