You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/01/18 00:40:06 UTC

spark git commit: [SPARK-23119][SS] Minor fixes to V2 streaming APIs

Repository: spark
Updated Branches:
  refs/heads/master 7823d43ec -> bac0d661a


[SPARK-23119][SS] Minor fixes to V2 streaming APIs

## What changes were proposed in this pull request?

- Added `InterfaceStability.Evolving` annotations
- Improved docs.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #20286 from tdas/SPARK-23119.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bac0d661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bac0d661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bac0d661

Branch: refs/heads/master
Commit: bac0d661af6092dd26638223156827aceb901229
Parents: 7823d43
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Jan 17 16:40:02 2018 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Jan 17 16:40:02 2018 -0800

----------------------------------------------------------------------
 .../v2/streaming/ContinuousReadSupport.java       |  2 ++
 .../v2/streaming/reader/ContinuousDataReader.java |  2 ++
 .../v2/streaming/reader/ContinuousReader.java     |  9 +++++++--
 .../v2/streaming/reader/MicroBatchReader.java     |  5 +++++
 .../sql/sources/v2/streaming/reader/Offset.java   | 18 +++++++++++++-----
 .../v2/streaming/reader/PartitionOffset.java      |  3 +++
 .../sql/sources/v2/writer/DataSourceV2Writer.java |  5 ++++-
 7 files changed, 36 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
index 3136cee..9a93a80 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.sources.v2.streaming;
 
 import java.util.Optional;
 
+import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
@@ -28,6 +29,7 @@ import org.apache.spark.sql.types.StructType;
  * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
  * provide data reading ability for continuous stream processing.
  */
+@InterfaceStability.Evolving
 public interface ContinuousReadSupport extends DataSourceV2 {
   /**
    * Creates a {@link ContinuousReader} to scan the data from this data source.

http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
index ca9a290..3f13a4d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
@@ -17,11 +17,13 @@
 
 package org.apache.spark.sql.sources.v2.streaming.reader;
 
+import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.DataReader;
 
 /**
  * A variation on {@link DataReader} for use with streaming in continuous processing mode.
  */
+@InterfaceStability.Evolving
 public interface ContinuousDataReader<T> extends DataReader<T> {
     /**
      * Get the offset of the current record, or the start offset if no records have been read.

http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
index f0b2058..745f1ce 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.sources.v2.streaming.reader;
 
+import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
 import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
 
@@ -27,11 +28,15 @@ import java.util.Optional;
  * interface to allow reading in a continuous processing mode stream.
  *
  * Implementations must ensure each read task output is a {@link ContinuousDataReader}.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
  */
+@InterfaceStability.Evolving
 public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
     /**
-     * Merge offsets coming from {@link ContinuousDataReader} instances in each partition to
-     * a single global offset.
+     * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
+     * partition to a single global offset.
      */
     Offset mergeOffsets(PartitionOffset[] offsets);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
index 70ff756..02f37ce 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.sources.v2.streaming.reader;
 
+import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
 import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
 
@@ -25,7 +26,11 @@ import java.util.Optional;
 /**
  * A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
  * interface to indicate they allow micro-batch streaming reads.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
  */
+@InterfaceStability.Evolving
 public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
     /**
      * Set the desired offset range for read tasks created from this reader. Read tasks will

http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
index 60b87f2..abba3e7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
@@ -17,12 +17,20 @@
 
 package org.apache.spark.sql.sources.v2.streaming.reader;
 
+import org.apache.spark.annotation.InterfaceStability;
+
 /**
- * An abstract representation of progress through a [[MicroBatchReader]] or [[ContinuousReader]].
- * During execution, Offsets provided by the data source implementation will be logged and used as
- * restart checkpoints. Sources should provide an Offset implementation which they can use to
- * reconstruct the stream position where the offset was taken.
+ * An abstract representation of progress through a {@link MicroBatchReader} or
+ * {@link ContinuousReader}.
+ * During execution, offsets provided by the data source implementation will be logged and used as
+ * restart checkpoints. Each source should provide an offset implementation which the source can use
+ * to reconstruct a position in the stream up to which data has been seen/processed.
+ *
+ * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
+ * maintain compatibility with DataSource V1 APIs. This extension will be removed once we
+ * get rid of V1 completely.
  */
+@InterfaceStability.Evolving
 public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
     /**
      * A JSON-serialized representation of an Offset that is
@@ -37,7 +45,7 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of
     /**
      * Equality based on JSON string representation. We leverage the
      * JSON representation for normalization between the Offset's
-     * in memory and on disk representations.
+     * in deserialized and serialized representations.
      */
     @Override
     public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
index eca0085..4688b85 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
@@ -19,11 +19,14 @@ package org.apache.spark.sql.sources.v2.streaming.reader;
 
 import java.io.Serializable;
 
+import org.apache.spark.annotation.InterfaceStability;
+
 /**
  * Used for per-partition offsets in continuous processing. ContinuousReader implementations will
  * provide a method to merge these into a global Offset.
  *
  * These offsets must be serializable.
  */
+@InterfaceStability.Evolving
 public interface PartitionOffset extends Serializable {
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bac0d661/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
index fc37b9a..317ac45 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
@@ -22,11 +22,14 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.sources.v2.DataSourceV2Options;
 import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.types.StructType;
 
 /**
  * A data source writer that is returned by
- * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
+ * {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
+ * {@link org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport#createMicroBatchWriter(String, long, StructType, OutputMode, DataSourceV2Options)}/
+ * {@link org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport#createContinuousWriter(String, StructType, OutputMode, DataSourceV2Options)}.
  * It can mix in various writing optimization interfaces to speed up the data saving. The actual
  * writing logic is delegated to {@link DataWriter}.
  *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org