You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:11 UTC

[flink] 15/16: [hotfix][core] Improve JavaDocs for FLIP-27 sources.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0804de54315856b5c3e16c5ed97ac4682ad01ae6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 27 13:40:33 2020 +0200

    [hotfix][core] Improve JavaDocs for FLIP-27 sources.
---
 .../flink/api/connector/source/ReaderOutput.java   | 40 +++++++++++++++++++---
 .../flink/api/connector/source/SourceOutput.java   | 30 +++++++++++++---
 2 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
index 1774a7c..dbfcba8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -19,17 +19,38 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.Watermark;
 
 /**
- * The interface provided by Flink task to the {@link SourceReader} to emit records
- * to downstream operators for message processing.
+ * The interface provided by the Flink runtime to the {@link SourceReader} to emit records, and
+ * optionally watermarks, to downstream operators for message processing.
+ *
+ * <p>The {@code ReaderOutput} is a {@link SourceOutput} and can be used directly to emit the stream
+ * of events from the source. This is recommended for source where the SourceReader processes only a
+ * single split, or where NO split-specific characteristics are required (like per-split watermarks
+ * and idleness, split-specific event-time skew handling, etc.).
+ * As a special case, this is true for sources that are purely supporting bounded/batch data processing.
+ *
+ * <p>For most streaming sources, the {@code SourceReader} should use split-specific outputs, to allow
+ * the processing logic to run per-split watermark generators, idleness detection, etc.
+ * To create a split-specific {@code SourceOutput} use the {@link ReaderOutput#createOutputForSplit(String)}
+ * method, using the Source Split's ID. Make sure to release the output again once the source has finished
+ * processing that split.
  */
 @PublicEvolving
 public interface ReaderOutput<T> extends SourceOutput<T> {
 
 	/**
-	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 * Emit a record without a timestamp.
+	 *
+	 * <p>Use this method if the source system does not have a notion of records with timestamps.
+	 *
+	 * <p>The events later pass through a {@link TimestampAssigner}, which attaches a timestamp
+	 * to the event based on the event's contents. For example a file source with JSON records would not
+	 * have a generic timestamp from the file reading and JSON parsing process, and thus use this
+	 * method to produce initially a record without a timestamp. The {@code TimestampAssigner} in
+	 * the next step would be used to extract timestamp from a field of the JSON object.
 	 *
 	 * @param record the record to emit.
 	 */
@@ -37,9 +58,18 @@ public interface ReaderOutput<T> extends SourceOutput<T> {
 	void collect(T record);
 
 	/**
-	 * Emit a record with timestamp.
+	 * Emit a record with a timestamp.
 	 *
-	 * @param record the record to emit.
+	 * <p>Use this method if the source system has timestamps attached to records. Typical examples
+	 * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp with
+	 * each event.
+	 *
+	 * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
+	 * either use this source-provided timestamp, or replace it with a timestamp stored within the
+	 * event (for example if the event was a JSON object one could configure aTimestampAssigner that
+	 * extracts one of the object's fields and uses that as a timestamp).
+	 *
+	 * @param record    the record to emit.
 	 * @param timestamp the timestamp of the record.
 	 */
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
index ccf560b..ff088d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
@@ -19,24 +19,46 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 
 /**
- * The interface provided by Flink task to the {@link SourceReader} to emit records
- * to downstream operators for message processing.
+ * The {@code SourceOutput} is the gateway for a {@link SourceReader}) to emit the produced
+ * records and watermarks.
+ *
+ * <p>A {@code SourceReader} may have multiple SourceOutputs, scoped to individual <i>Source Splits</i>. That
+ * way, streams of events from different splits can be identified and treated separately, for example
+ * for watermark generation, or event-time skew handling.
  */
 @PublicEvolving
 public interface SourceOutput<T> extends WatermarkOutput {
 
 	/**
-	 * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+	 * Emit a record without a timestamp.
+	 *
+	 * <p>Use this method if the source system does not have a notion of records with timestamps.
+	 *
+	 * <p>The events later pass through a {@link TimestampAssigner}, which attaches a timestamp
+	 * to the event based on the event's contents. For example a file source with JSON records would not
+	 * have a generic timestamp from the file reading and JSON parsing process, and thus use this
+	 * method to produce initially a record without a timestamp. The {@code TimestampAssigner} in
+	 * the next step would be used to extract timestamp from a field of the JSON object.
 	 *
 	 * @param record the record to emit.
 	 */
 	void collect(T record);
 
 	/**
-	 * Emit a record with timestamp.
+	 * Emit a record with a timestamp.
+	 *
+	 * <p>Use this method if the source system has timestamps attached to records. Typical examples
+	 * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp with
+	 * each event.
+	 *
+	 * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
+	 * either use this source-provided timestamp, or replace it with a timestamp stored within the
+	 * event (for example if the event was a JSON object one could configure aTimestampAssigner that
+	 * extracts one of the object's fields and uses that as a timestamp).
 	 *
 	 * @param record the record to emit.
 	 * @param timestamp the timestamp of the record.