You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/27 20:01:11 UTC
[flink] 15/16: [hotfix][core] Improve JavaDocs for FLIP-27 sources.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0804de54315856b5c3e16c5ed97ac4682ad01ae6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 27 13:40:33 2020 +0200
[hotfix][core] Improve JavaDocs for FLIP-27 sources.
---
.../flink/api/connector/source/ReaderOutput.java | 40 +++++++++++++++++++---
.../flink/api/connector/source/SourceOutput.java | 30 +++++++++++++---
2 files changed, 61 insertions(+), 9 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
index 1774a7c..dbfcba8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
@@ -19,17 +19,38 @@
package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
/**
- * The interface provided by Flink task to the {@link SourceReader} to emit records
- * to downstream operators for message processing.
+ * The interface provided by the Flink runtime to the {@link SourceReader} to emit records, and
+ * optionally watermarks, to downstream operators for message processing.
+ *
+ * <p>The {@code ReaderOutput} is a {@link SourceOutput} and can be used directly to emit the stream
+ * of events from the source. This is recommended for source where the SourceReader processes only a
+ * single split, or where NO split-specific characteristics are required (like per-split watermarks
+ * and idleness, split-specific event-time skew handling, etc.).
+ * As a special case, this is true for sources that are purely supporting bounded/batch data processing.
+ *
+ * <p>For most streaming sources, the {@code SourceReader} should use split-specific outputs, to allow
+ * the processing logic to run per-split watermark generators, idleness detection, etc.
+ * To create a split-specific {@code SourceOutput} use the {@link ReaderOutput#createOutputForSplit(String)}
+ * method, using the Source Split's ID. Make sure to release the output again once the source has finished
+ * processing that split.
*/
@PublicEvolving
public interface ReaderOutput<T> extends SourceOutput<T> {
/**
- * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+ * Emit a record without a timestamp.
+ *
+ * <p>Use this method if the source system does not have a notion of records with timestamps.
+ *
+ * <p>The events later pass through a {@link TimestampAssigner}, which attaches a timestamp
+ * to the event based on the event's contents. For example a file source with JSON records would not
+ * have a generic timestamp from the file reading and JSON parsing process, and thus use this
+ * method to produce initially a record without a timestamp. The {@code TimestampAssigner} in
+ * the next step would be used to extract timestamp from a field of the JSON object.
*
* @param record the record to emit.
*/
@@ -37,9 +58,18 @@ public interface ReaderOutput<T> extends SourceOutput<T> {
void collect(T record);
/**
- * Emit a record with timestamp.
+ * Emit a record with a timestamp.
*
- * @param record the record to emit.
+ * <p>Use this method if the source system has timestamps attached to records. Typical examples
+ * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp with
+ * each event.
+ *
+ * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
+ * either use this source-provided timestamp, or replace it with a timestamp stored within the
+ * event (for example if the event was a JSON object one could configure aTimestampAssigner that
+ * extracts one of the object's fields and uses that as a timestamp).
+ *
+ * @param record the record to emit.
* @param timestamp the timestamp of the record.
*/
@Override
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
index ccf560b..ff088d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceOutput.java
@@ -19,24 +19,46 @@
package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
/**
- * The interface provided by Flink task to the {@link SourceReader} to emit records
- * to downstream operators for message processing.
+ * The {@code SourceOutput} is the gateway for a {@link SourceReader}) to emit the produced
+ * records and watermarks.
+ *
+ * <p>A {@code SourceReader} may have multiple SourceOutputs, scoped to individual <i>Source Splits</i>. That
+ * way, streams of events from different splits can be identified and treated separately, for example
+ * for watermark generation, or event-time skew handling.
*/
@PublicEvolving
public interface SourceOutput<T> extends WatermarkOutput {
/**
- * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
+ * Emit a record without a timestamp.
+ *
+ * <p>Use this method if the source system does not have a notion of records with timestamps.
+ *
+ * <p>The events later pass through a {@link TimestampAssigner}, which attaches a timestamp
+ * to the event based on the event's contents. For example a file source with JSON records would not
+ * have a generic timestamp from the file reading and JSON parsing process, and thus use this
+ * method to produce initially a record without a timestamp. The {@code TimestampAssigner} in
+ * the next step would be used to extract timestamp from a field of the JSON object.
*
* @param record the record to emit.
*/
void collect(T record);
/**
- * Emit a record with timestamp.
+ * Emit a record with a timestamp.
+ *
+ * <p>Use this method if the source system has timestamps attached to records. Typical examples
+ * would be Logs, PubSubs, or Message Queues, like Kafka or Kinesis, which store a timestamp with
+ * each event.
+ *
+ * <p>The events typically still pass through a {@link TimestampAssigner}, which may decide to
+ * either use this source-provided timestamp, or replace it with a timestamp stored within the
+ * event (for example if the event was a JSON object one could configure aTimestampAssigner that
+ * extracts one of the object's fields and uses that as a timestamp).
*
* @param record the record to emit.
* @param timestamp the timestamp of the record.