You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/01/18 00:40:06 UTC
spark git commit: [SPARK-23119][SS] Minor fixes to V2 streaming APIs
Repository: spark
Updated Branches:
refs/heads/master 7823d43ec -> bac0d661a
[SPARK-23119][SS] Minor fixes to V2 streaming APIs
## What changes were proposed in this pull request?
- Added `InterfaceStability.Evolving` annotations
- Improved docs.
## How was this patch tested?
Existing tests.
Author: Tathagata Das <ta...@gmail.com>
Closes #20286 from tdas/SPARK-23119.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bac0d661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bac0d661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bac0d661
Branch: refs/heads/master
Commit: bac0d661af6092dd26638223156827aceb901229
Parents: 7823d43
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Jan 17 16:40:02 2018 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Jan 17 16:40:02 2018 -0800
----------------------------------------------------------------------
.../v2/streaming/ContinuousReadSupport.java | 2 ++
.../v2/streaming/reader/ContinuousDataReader.java | 2 ++
.../v2/streaming/reader/ContinuousReader.java | 9 +++++++--
.../v2/streaming/reader/MicroBatchReader.java | 5 +++++
.../sql/sources/v2/streaming/reader/Offset.java | 18 +++++++++++++-----
.../v2/streaming/reader/PartitionOffset.java | 3 +++
.../sql/sources/v2/writer/DataSourceV2Writer.java | 5 ++++-
7 files changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
index 3136cee..9a93a80 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.sources.v2.streaming;
import java.util.Optional;
+import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
@@ -28,6 +29,7 @@ import org.apache.spark.sql.types.StructType;
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for continuous stream processing.
*/
+@InterfaceStability.Evolving
public interface ContinuousReadSupport extends DataSourceV2 {
/**
* Creates a {@link ContinuousReader} to scan the data from this data source.
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
index ca9a290..3f13a4d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
@@ -17,11 +17,13 @@
package org.apache.spark.sql.sources.v2.streaming.reader;
+import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
/**
* A variation on {@link DataReader} for use with streaming in continuous processing mode.
*/
+@InterfaceStability.Evolving
public interface ContinuousDataReader<T> extends DataReader<T> {
/**
* Get the offset of the current record, or the start offset if no records have been read.
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
index f0b2058..745f1ce 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
@@ -17,6 +17,7 @@
package org.apache.spark.sql.sources.v2.streaming.reader;
+import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
@@ -27,11 +28,15 @@ import java.util.Optional;
* interface to allow reading in a continuous processing mode stream.
*
* Implementations must ensure each read task output is a {@link ContinuousDataReader}.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
+@InterfaceStability.Evolving
public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
/**
- * Merge offsets coming from {@link ContinuousDataReader} instances in each partition to
- * a single global offset.
+ * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
+ * partition to a single global offset.
*/
Offset mergeOffsets(PartitionOffset[] offsets);
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
index 70ff756..02f37ce 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
@@ -17,6 +17,7 @@
package org.apache.spark.sql.sources.v2.streaming.reader;
+import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
@@ -25,7 +26,11 @@ import java.util.Optional;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* interface to indicate they allow micro-batch streaming reads.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
+@InterfaceStability.Evolving
public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
/**
* Set the desired offset range for read tasks created from this reader. Read tasks will
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
index 60b87f2..abba3e7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
@@ -17,12 +17,20 @@
package org.apache.spark.sql.sources.v2.streaming.reader;
+import org.apache.spark.annotation.InterfaceStability;
+
/**
- * An abstract representation of progress through a [[MicroBatchReader]] or [[ContinuousReader]].
- * During execution, Offsets provided by the data source implementation will be logged and used as
- * restart checkpoints. Sources should provide an Offset implementation which they can use to
- * reconstruct the stream position where the offset was taken.
+ * An abstract representation of progress through a {@link MicroBatchReader} or
+ * {@link ContinuousReader}.
+ * During execution, offsets provided by the data source implementation will be logged and used as
+ * restart checkpoints. Each source should provide an offset implementation which the source can use
+ * to reconstruct a position in the stream up to which data has been seen/processed.
+ *
+ * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
+ * maintain compatibility with DataSource V1 APIs. This extension will be removed once we
+ * get rid of V1 completely.
*/
+@InterfaceStability.Evolving
public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
/**
* A JSON-serialized representation of an Offset that is
@@ -37,7 +45,7 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of
/**
* Equality based on JSON string representation. We leverage the
* JSON representation for normalization between the Offset's
- * in memory and on disk representations.
+ * in deserialized and serialized representations.
*/
@Override
public boolean equals(Object obj) {
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
index eca0085..4688b85 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
@@ -19,11 +19,14 @@ package org.apache.spark.sql.sources.v2.streaming.reader;
import java.io.Serializable;
+import org.apache.spark.annotation.InterfaceStability;
+
/**
* Used for per-partition offsets in continuous processing. ContinuousReader implementations will
* provide a method to merge these into a global Offset.
*
* These offsets must be serializable.
*/
+@InterfaceStability.Evolving
public interface PartitionOffset extends Serializable {
}
http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
index fc37b9a..317ac45 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
@@ -22,11 +22,14 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
/**
* A data source writer that is returned by
- * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
+ * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
+ * {@link org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport#createMicroBatchWriter(String, long, StructType, OutputMode, DataSourceV2Options)}/
+ * {@link org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport#createContinuousWriter(String, StructType, OutputMode, DataSourceV2Options)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org