You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/01 04:33:56 UTC
[2/2] spark git commit: [SPARK-23268][SQL] Reorganize packages in
data source V2
[SPARK-23268][SQL] Reorganize packages in data source V2
## What changes were proposed in this pull request?
1. create a new package for partitioning/distribution related classes.
As Spark will add new concrete implementations of `Distribution` in new releases, it is good to
have a new package for partitioning/distribution related classes.
2. move streaming related class to package `org.apache.spark.sql.sources.v2.reader/writer.streaming`, instead of `org.apache.spark.sql.sources.v2.streaming.reader/writer`.
So that the there won't be package reader/writer inside package streaming, which is quite confusing.
Before change:
```
v2
├── reader
├── streaming
│ ├── reader
│ └── writer
└── writer
```
After change:
```
v2
├── reader
│ └── streaming
└── writer
└── streaming
```
## How was this patch tested?
Unit test.
Author: Wang Gengliang <lt...@gmail.com>
Closes #20435 from gengliangwang/new_pkg.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56ae3265
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56ae3265
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56ae3265
Branch: refs/heads/master
Commit: 56ae32657e9e5d1e30b62afe77d9e14eb07cf4fb
Parents: 2ac895b
Author: Wang Gengliang <lt...@gmail.com>
Authored: Wed Jan 31 20:33:51 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 31 20:33:51 2018 -0800
----------------------------------------------------------------------
.../sql/kafka010/KafkaContinuousReader.scala | 2 +-
.../spark/sql/kafka010/KafkaSourceOffset.scala | 2 +-
.../sql/kafka010/KafkaSourceProvider.scala | 5 +-
.../spark/sql/kafka010/KafkaStreamWriter.scala | 2 +-
.../v2/reader/ClusteredDistribution.java | 38 ----------
.../v2/reader/ContinuousReadSupport.java | 48 ++++++++++++
.../sql/sources/v2/reader/Distribution.java | 39 ----------
.../v2/reader/MicroBatchReadSupport.java | 54 +++++++++++++
.../sql/sources/v2/reader/Partitioning.java | 46 ------------
.../v2/reader/SupportsReportPartitioning.java | 1 +
.../partitioning/ClusteredDistribution.java | 39 ++++++++++
.../v2/reader/partitioning/Distribution.java | 40 ++++++++++
.../v2/reader/partitioning/Partitioning.java | 48 ++++++++++++
.../reader/streaming/ContinuousDataReader.java | 36 +++++++++
.../v2/reader/streaming/ContinuousReader.java | 79 ++++++++++++++++++++
.../v2/reader/streaming/MicroBatchReader.java | 75 +++++++++++++++++++
.../sql/sources/v2/reader/streaming/Offset.java | 69 +++++++++++++++++
.../v2/reader/streaming/PartitionOffset.java | 32 ++++++++
.../v2/streaming/ContinuousReadSupport.java | 48 ------------
.../v2/streaming/MicroBatchReadSupport.java | 54 -------------
.../v2/streaming/StreamWriteSupport.java | 54 -------------
.../streaming/reader/ContinuousDataReader.java | 36 ---------
.../v2/streaming/reader/ContinuousReader.java | 79 --------------------
.../v2/streaming/reader/MicroBatchReader.java | 75 -------------------
.../sql/sources/v2/streaming/reader/Offset.java | 69 -----------------
.../v2/streaming/reader/PartitionOffset.java | 32 --------
.../v2/streaming/writer/StreamWriter.java | 72 ------------------
.../sql/sources/v2/writer/DataSourceWriter.java | 2 +-
.../sources/v2/writer/StreamWriteSupport.java | 53 +++++++++++++
.../v2/writer/streaming/StreamWriter.java | 72 ++++++++++++++++++
.../datasources/v2/DataSourcePartitioning.scala | 2 +-
.../datasources/v2/DataSourceV2ScanExec.scala | 2 +-
.../datasources/v2/WriteToDataSourceV2.scala | 2 +-
.../streaming/MicroBatchExecution.scala | 6 +-
.../streaming/RateSourceProvider.scala | 5 +-
.../execution/streaming/RateStreamOffset.scala | 2 +-
.../execution/streaming/StreamingRelation.scala | 2 +-
.../spark/sql/execution/streaming/console.scala | 4 +-
.../ContinuousDataSourceRDDIter.scala | 10 +--
.../continuous/ContinuousExecution.scala | 5 +-
.../continuous/ContinuousRateStreamSource.scala | 2 +-
.../streaming/continuous/EpochCoordinator.scala | 4 +-
.../streaming/sources/ConsoleWriter.scala | 2 +-
.../streaming/sources/MicroBatchWriter.scala | 2 +-
.../streaming/sources/RateStreamSourceV2.scala | 3 +-
.../execution/streaming/sources/memoryV2.scala | 3 +-
.../spark/sql/streaming/DataStreamReader.scala | 2 +-
.../spark/sql/streaming/DataStreamWriter.scala | 2 +-
.../sql/streaming/StreamingQueryManager.scala | 2 +-
.../v2/JavaPartitionAwareDataSource.java | 3 +
.../execution/streaming/RateSourceV2Suite.scala | 2 +-
.../sql/sources/v2/DataSourceV2Suite.scala | 1 +
.../sources/StreamingDataSourceV2Suite.scala | 8 +-
53 files changed, 690 insertions(+), 687 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 8c73342..41c443b 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRo
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index c82154c..8d41c0d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
-import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset}
/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 85e96b6..694ca76 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
+import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
index a24efde..9307bfc 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
deleted file mode 100644
index 27905e3..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * A concrete implementation of {@link Distribution}. Represents a distribution where records that
- * share the same values for the {@link #clusteredColumns} will be produced by the same
- * {@link DataReader}.
- */
-@InterfaceStability.Evolving
-public class ClusteredDistribution implements Distribution {
-
- /**
- * The names of the clustered columns. Note that they are order insensitive.
- */
- public final String[] clusteredColumns;
-
- public ClusteredDistribution(String[] clusteredColumns) {
- this.clusteredColumns = clusteredColumns;
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
new file mode 100644
index 0000000..0c1d5d1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousReadSupport.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * provide data reading ability for continuous stream processing.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReadSupport extends DataSourceV2 {
+ /**
+ * Creates a {@link ContinuousReader} to scan the data from this data source.
+ *
+ * @param schema the user provided schema, or empty() if none was provided
+ * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
+ * recovery. Readers for the same logical source in the same query
+ * will be given the same checkpointLocation.
+ * @param options the options for the returned data source reader, which is an immutable
+ * case-insensitive string-to-string map.
+ */
+ ContinuousReader createContinuousReader(
+ Optional<StructType> schema,
+ String checkpointLocation,
+ DataSourceOptions options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
deleted file mode 100644
index b375621..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface to represent data distribution requirement, which specifies how the records should
- * be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
- * Note that this interface has nothing to do with the data ordering inside one
- * partition(the output records of a single {@link DataReader}).
- *
- * The instance of this interface is created and provided by Spark, then consumed by
- * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to
- * implement this interface, but need to catch as more concrete implementations of this interface
- * as possible in {@link Partitioning#satisfy(Distribution)}.
- *
- * Concrete implementations until now:
- * <ul>
- * <li>{@link ClusteredDistribution}</li>
- * </ul>
- */
-@InterfaceStability.Evolving
-public interface Distribution {}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
new file mode 100644
index 0000000..5e8f0c0
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/MicroBatchReadSupport.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * provide streaming micro-batch data reading ability.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReadSupport extends DataSourceV2 {
+ /**
+ * Creates a {@link MicroBatchReader} to read batches of data from this data source in a
+ * streaming query.
+ *
+ * The execution engine will create a micro-batch reader at the start of a streaming query,
+ * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
+ * then call stop() when the execution is complete. Note that a single query may have multiple
+ * executions due to restart or failure recovery.
+ *
+ * @param schema the user provided schema, or empty() if none was provided
+ * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
+ * recovery. Readers for the same logical source in the same query
+ * will be given the same checkpointLocation.
+ * @param options the options for the returned data source reader, which is an immutable
+ * case-insensitive string-to-string map.
+ */
+ MicroBatchReader createMicroBatchReader(
+ Optional<StructType> schema,
+ String checkpointLocation,
+ DataSourceOptions options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
deleted file mode 100644
index 5e334d1..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface to represent the output data partitioning for a data source, which is returned by
- * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a
- * snapshot. Once created, it should be deterministic and always report the same number of
- * partitions and the same "satisfy" result for a certain distribution.
- */
-@InterfaceStability.Evolving
-public interface Partitioning {
-
- /**
- * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
- */
- int numPartitions();
-
- /**
- * Returns true if this partitioning can satisfy the given distribution, which means Spark does
- * not need to shuffle the output data of this data source for some certain operations.
- *
- * Note that, Spark may add new concrete implementations of {@link Distribution} in new releases.
- * This method should be aware of it and always return false for unrecognized distributions. It's
- * recommended to check every Spark new release and support new distributions if possible, to
- * avoid shuffle at Spark side for more cases.
- */
- boolean satisfy(Distribution distribution);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index a2383a9..5405a91 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
/**
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
new file mode 100644
index 0000000..2d0ee50
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.partitioning;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * A concrete implementation of {@link Distribution}. Represents a distribution where records that
+ * share the same values for the {@link #clusteredColumns} will be produced by the same
+ * {@link DataReader}.
+ */
+@InterfaceStability.Evolving
+public class ClusteredDistribution implements Distribution {
+
+ /**
+ * The names of the clustered columns. Note that they are order insensitive.
+ */
+ public final String[] clusteredColumns;
+
+ public ClusteredDistribution(String[] clusteredColumns) {
+ this.clusteredColumns = clusteredColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
new file mode 100644
index 0000000..f6b111f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.partitioning;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * An interface to represent data distribution requirement, which specifies how the records should
+ * be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
+ * Note that this interface has nothing to do with the data ordering inside one
+ * partition(the output records of a single {@link DataReader}).
+ *
+ * The instance of this interface is created and provided by Spark, then consumed by
+ * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to
+ * implement this interface, but need to catch as more concrete implementations of this interface
+ * as possible in {@link Partitioning#satisfy(Distribution)}.
+ *
+ * Concrete implementations until now:
+ * <ul>
+ * <li>{@link ClusteredDistribution}</li>
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public interface Distribution {}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
new file mode 100644
index 0000000..309d9e5
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.partitioning;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
+
+/**
+ * An interface to represent the output data partitioning for a data source, which is returned by
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a
+ * snapshot. Once created, it should be deterministic and always report the same number of
+ * partitions and the same "satisfy" result for a certain distribution.
+ */
+@InterfaceStability.Evolving
+public interface Partitioning {
+
+ /**
+ * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
+ */
+ int numPartitions();
+
+ /**
+ * Returns true if this partitioning can satisfy the given distribution, which means Spark does
+ * not need to shuffle the output data of this data source for some certain operations.
+ *
+ * Note that, Spark may add new concrete implementations of {@link Distribution} in new releases.
+ * This method should be aware of it and always return false for unrecognized distributions. It's
+ * recommended to check every Spark new release and support new distributions if possible, to
+ * avoid shuffle at Spark side for more cases.
+ */
+ boolean satisfy(Distribution distribution);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
new file mode 100644
index 0000000..47d2644
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataReader;
+
+/**
+ * A variation on {@link DataReader} for use with streaming in continuous processing mode.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousDataReader<T> extends DataReader<T> {
+ /**
+ * Get the offset of the current record, or the start offset if no records have been read.
+ *
+ * The execution engine will call this method along with get() to keep track of the current
+ * offset. When an epoch ends, the offset of the previous record in each partition will be saved
+ * as a restart checkpoint.
+ */
+ PartitionOffset getOffset();
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
new file mode 100644
index 0000000..d1d1e7f
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
+ * interface to allow reading in a continuous processing mode stream.
+ *
+ * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
+ /**
+ * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
+ * partition to a single global offset.
+ */
+ Offset mergeOffsets(PartitionOffset[] offsets);
+
+ /**
+ * Deserialize a JSON string into an Offset of the implementation-defined offset type.
+ * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
+ */
+ Offset deserializeOffset(String json);
+
+ /**
+ * Set the desired start offset for reader factories created from this reader. The scan will
+ * start from the first record after the provided offset, or from an implementation-defined
+ * inferred starting point if no offset is provided.
+ */
+ void setOffset(Optional<Offset> start);
+
+ /**
+ * Return the specified or inferred start offset for this reader.
+ *
+ * @throws IllegalStateException if setOffset has not been called
+ */
+ Offset getStartOffset();
+
+ /**
+ * The execution engine will call this method in every epoch to determine if new reader
+ * factories need to be generated, which may be required if for example the underlying
+ * source system has had partitions added or removed.
+ *
+ * If true, the query will be shut down and restarted with a new reader.
+ */
+ default boolean needsReconfiguration() {
+ return false;
+ }
+
+ /**
+ * Informs the source that Spark has completed processing all data for offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the future.
+ */
+ void commit(Offset end);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
new file mode 100644
index 0000000..67ebde3
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
+ /**
+ * Set the desired offset range for reader factories created from this reader. Reader factories
+ * will generate only data within (`start`, `end`]; that is, from the first record after `start`
+ * to the record with offset `end`.
+ *
+ * @param start The initial offset to scan from. If not specified, scan from an
+ * implementation-specified start point, such as the earliest available record.
+ * @param end The last offset to include in the scan. If not specified, scan up to an
+ * implementation-defined endpoint, such as the last available offset
+ * or the start offset plus a target batch size.
+ */
+ void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
+
+ /**
+ * Returns the specified (if explicitly set through setOffsetRange) or inferred start offset
+ * for this reader.
+ *
+ * @throws IllegalStateException if setOffsetRange has not been called
+ */
+ Offset getStartOffset();
+
+ /**
+ * Return the specified (if explicitly set through setOffsetRange) or inferred end offset
+ * for this reader.
+ *
+ * @throws IllegalStateException if setOffsetRange has not been called
+ */
+ Offset getEndOffset();
+
+ /**
+ * Deserialize a JSON string into an Offset of the implementation-defined offset type.
+ * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
+ */
+ Offset deserializeOffset(String json);
+
+ /**
+ * Informs the source that Spark has completed processing all data for offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the future.
+ */
+ void commit(Offset end);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
new file mode 100644
index 0000000..e41c035
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * An abstract representation of progress through a {@link MicroBatchReader} or
+ * {@link ContinuousReader}.
+ * During execution, offsets provided by the data source implementation will be logged and used as
+ * restart checkpoints. Each source should provide an offset implementation which the source can use
+ * to reconstruct a position in the stream up to which data has been seen/processed.
+ *
+ * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
+ * maintain compatibility with DataSource V1 APIs. This extension will be removed once we
+ * get rid of V1 completely.
+ */
+@InterfaceStability.Evolving
+public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
+ /**
+ * A JSON-serialized representation of an Offset that is
+ * used for saving offsets to the offset log.
+ * Note: We assume that equivalent/equal offsets serialize to
+ * identical JSON strings.
+ *
+ * @return JSON string encoding
+ */
+ public abstract String json();
+
+ /**
+ * Equality based on JSON string representation. We leverage the
+ * JSON representation for normalization between the Offset's
+ * in deserialized and serialized representations.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
+ return this.json()
+ .equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return this.json().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return this.json();
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
new file mode 100644
index 0000000..383e73d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/PartitionOffset.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * Used for per-partition offsets in continuous processing. ContinuousReader implementations will
+ * provide a method to merge these into a global Offset.
+ *
+ * These offsets must be serializable.
+ */
+@InterfaceStability.Evolving
+public interface PartitionOffset extends Serializable {
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
deleted file mode 100644
index f79424e..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/ContinuousReadSupport.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data reading ability for continuous stream processing.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReadSupport extends DataSourceV2 {
- /**
- * Creates a {@link ContinuousReader} to scan the data from this data source.
- *
- * @param schema the user provided schema, or empty() if none was provided
- * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
- * recovery. Readers for the same logical source in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is an immutable
- * case-insensitive string-to-string map.
- */
- ContinuousReader createContinuousReader(
- Optional<StructType> schema,
- String checkpointLocation,
- DataSourceOptions options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
deleted file mode 100644
index 22660e4..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide streaming micro-batch data reading ability.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReadSupport extends DataSourceV2 {
- /**
- * Creates a {@link MicroBatchReader} to read batches of data from this data source in a
- * streaming query.
- *
- * The execution engine will create a micro-batch reader at the start of a streaming query,
- * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
- * then call stop() when the execution is complete. Note that a single query may have multiple
- * executions due to restart or failure recovery.
- *
- * @param schema the user provided schema, or empty() if none was provided
- * @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
- * recovery. Readers for the same logical source in the same query
- * will be given the same checkpointLocation.
- * @param options the options for the returned data source reader, which is an immutable
- * case-insensitive string-to-string map.
- */
- MicroBatchReader createMicroBatchReader(
- Optional<StructType> schema,
- String checkpointLocation,
- DataSourceOptions options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
deleted file mode 100644
index 7c5f304..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/StreamWriteSupport.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.streaming.OutputMode;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability for structured streaming.
- */
-@InterfaceStability.Evolving
-public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
-
- /**
- * Creates an optional {@link StreamWriter} to save the data to this data source. Data
- * sources can return None if there is no writing needed to be done.
- *
- * @param queryId A unique string for the writing query. It's possible that there are many
- * writing queries running at the same time, and the returned
- * {@link DataSourceWriter} can use this id to distinguish itself from others.
- * @param schema the schema of the data to be written.
- * @param mode the output mode which determines what successive epoch output means to this
- * sink, please refer to {@link OutputMode} for more details.
- * @param options the options for the returned data source writer, which is an immutable
- * case-insensitive string-to-string map.
- */
- StreamWriter createStreamWriter(
- String queryId,
- StructType schema,
- OutputMode mode,
- DataSourceOptions options);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
deleted file mode 100644
index 3f13a4d..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousDataReader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
-
-/**
- * A variation on {@link DataReader} for use with streaming in continuous processing mode.
- */
-@InterfaceStability.Evolving
-public interface ContinuousDataReader<T> extends DataReader<T> {
- /**
- * Get the offset of the current record, or the start offset if no records have been read.
- *
- * The execution engine will call this method along with get() to keep track of the current
- * offset. When an epoch ends, the offset of the previous record in each partition will be saved
- * as a restart checkpoint.
- */
- PartitionOffset getOffset();
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
deleted file mode 100644
index 6e5177e..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/ContinuousReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
- * interface to allow reading in a continuous processing mode stream.
- *
- * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
- *
- * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
- * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
- /**
- * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
- * partition to a single global offset.
- */
- Offset mergeOffsets(PartitionOffset[] offsets);
-
- /**
- * Deserialize a JSON string into an Offset of the implementation-defined offset type.
- * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
- */
- Offset deserializeOffset(String json);
-
- /**
- * Set the desired start offset for reader factories created from this reader. The scan will
- * start from the first record after the provided offset, or from an implementation-defined
- * inferred starting point if no offset is provided.
- */
- void setOffset(Optional<Offset> start);
-
- /**
- * Return the specified or inferred start offset for this reader.
- *
- * @throws IllegalStateException if setOffset has not been called
- */
- Offset getStartOffset();
-
- /**
- * The execution engine will call this method in every epoch to determine if new reader
- * factories need to be generated, which may be required if for example the underlying
- * source system has had partitions added or removed.
- *
- * If true, the query will be shut down and restarted with a new reader.
- */
- default boolean needsReconfiguration() {
- return false;
- }
-
- /**
- * Informs the source that Spark has completed processing all data for offsets less than or
- * equal to `end` and will only request offsets greater than `end` in the future.
- */
- void commit(Offset end);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
deleted file mode 100644
index fcec446..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/MicroBatchReader.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-
-import java.util.Optional;
-
-/**
- * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
- * interface to indicate they allow micro-batch streaming reads.
- *
- * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
- * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
- /**
- * Set the desired offset range for reader factories created from this reader. Reader factories
- * will generate only data within (`start`, `end`]; that is, from the first record after `start`
- * to the record with offset `end`.
- *
- * @param start The initial offset to scan from. If not specified, scan from an
- * implementation-specified start point, such as the earliest available record.
- * @param end The last offset to include in the scan. If not specified, scan up to an
- * implementation-defined endpoint, such as the last available offset
- * or the start offset plus a target batch size.
- */
- void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
-
- /**
- * Returns the specified (if explicitly set through setOffsetRange) or inferred start offset
- * for this reader.
- *
- * @throws IllegalStateException if setOffsetRange has not been called
- */
- Offset getStartOffset();
-
- /**
- * Return the specified (if explicitly set through setOffsetRange) or inferred end offset
- * for this reader.
- *
- * @throws IllegalStateException if setOffsetRange has not been called
- */
- Offset getEndOffset();
-
- /**
- * Deserialize a JSON string into an Offset of the implementation-defined offset type.
- * @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
- */
- Offset deserializeOffset(String json);
-
- /**
- * Informs the source that Spark has completed processing all data for offsets less than or
- * equal to `end` and will only request offsets greater than `end` in the future.
- */
- void commit(Offset end);
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
deleted file mode 100644
index abba3e7..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/Offset.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An abstract representation of progress through a {@link MicroBatchReader} or
- * {@link ContinuousReader}.
- * During execution, offsets provided by the data source implementation will be logged and used as
- * restart checkpoints. Each source should provide an offset implementation which the source can use
- * to reconstruct a position in the stream up to which data has been seen/processed.
- *
- * Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
- * maintain compatibility with DataSource V1 APIs. This extension will be removed once we
- * get rid of V1 completely.
- */
-@InterfaceStability.Evolving
-public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
- /**
- * A JSON-serialized representation of an Offset that is
- * used for saving offsets to the offset log.
- * Note: We assume that equivalent/equal offsets serialize to
- * identical JSON strings.
- *
- * @return JSON string encoding
- */
- public abstract String json();
-
- /**
- * Equality based on JSON string representation. We leverage the
- * JSON representation for normalization between the Offset's
- * in deserialized and serialized representations.
- */
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
- return this.json()
- .equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return this.json().hashCode();
- }
-
- @Override
- public String toString() {
- return this.json();
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
deleted file mode 100644
index 4688b85..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/reader/PartitionOffset.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.reader;
-
-import java.io.Serializable;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * Used for per-partition offsets in continuous processing. ContinuousReader implementations will
- * provide a method to merge these into a global Offset.
- *
- * These offsets must be serializable.
- */
-@InterfaceStability.Evolving
-public interface PartitionOffset extends Serializable {
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
deleted file mode 100644
index 915ee6c..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.streaming.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-
-/**
- * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
- * aborts relative to an epoch ID determined by the execution engine.
- *
- * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
- * and so must reset any internal state after a successful commit.
- */
-@InterfaceStability.Evolving
-public interface StreamWriter extends DataSourceWriter {
- /**
- * Commits this writing job for the specified epoch with a list of commit messages. The commit
- * messages are collected from successful data writers and are produced by
- * {@link DataWriter#commit()}.
- *
- * If this method fails (by throwing an exception), this writing job is considered to have been
- * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
- *
- * To support exactly-once processing, writer implementations should ensure that this method is
- * idempotent. The execution engine may call commit() multiple times for the same epoch
- * in some circumstances.
- */
- void commit(long epochId, WriterCommitMessage[] messages);
-
- /**
- * Aborts this writing job because some data writers are failed and keep failing when retry, or
- * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
- *
- * If this method fails (by throwing an exception), the underlying data source may require manual
- * cleanup.
- *
- * Unless the abort is triggered by the failure of commit, the given messages should have some
- * null slots as there maybe only a few data writers that are committed before the abort
- * happens, or some data writers were committed but their commit messages haven't reached the
- * driver when the abort is triggered. So this is just a "best effort" for data sources to
- * clean up the data left by data writers.
- */
- void abort(long epochId, WriterCommitMessage[] messages);
-
- default void commit(WriterCommitMessage[] messages) {
- throw new UnsupportedOperationException(
- "Commit without epoch should not be called with StreamWriter");
- }
-
- default void abort(WriterCommitMessage[] messages) {
- throw new UnsupportedOperationException(
- "Abort without epoch should not be called with StreamWriter");
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
index d89d27d..7096aec 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType;
/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/
- * {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
+ * {@link StreamWriteSupport#createStreamWriter(
* String, StructType, OutputMode, DataSourceOptions)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
new file mode 100644
index 0000000..1c0e2e1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/StreamWriteSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
+ * provide data writing ability for structured streaming.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
+
+ /**
+ * Creates an optional {@link StreamWriter} to save the data to this data source. Data
+ * sources can return None if there is no writing needed to be done.
+ *
+ * @param queryId A unique string for the writing query. It's possible that there are many
+ * writing queries running at the same time, and the returned
+ * {@link DataSourceWriter} can use this id to distinguish itself from others.
+ * @param schema the schema of the data to be written.
+ * @param mode the output mode which determines what successive epoch output means to this
+ * sink, please refer to {@link OutputMode} for more details.
+ * @param options the options for the returned data source writer, which is an immutable
+ * case-insensitive string-to-string map.
+ */
+ StreamWriter createStreamWriter(
+ String queryId,
+ StructType schema,
+ OutputMode mode,
+ DataSourceOptions options);
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
new file mode 100644
index 0000000..4913341
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
+ * aborts relative to an epoch ID determined by the execution engine.
+ *
+ * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
+ * and so must reset any internal state after a successful commit.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriter extends DataSourceWriter {
+ /**
+ * Commits this writing job for the specified epoch with a list of commit messages. The commit
+ * messages are collected from successful data writers and are produced by
+ * {@link DataWriter#commit()}.
+ *
+ * If this method fails (by throwing an exception), this writing job is considered to have been
+ * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
+ *
+ * To support exactly-once processing, writer implementations should ensure that this method is
+ * idempotent. The execution engine may call commit() multiple times for the same epoch
+ * in some circumstances.
+ */
+ void commit(long epochId, WriterCommitMessage[] messages);
+
+ /**
+ * Aborts this writing job because some data writers are failed and keep failing when retry, or
+ * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
+ *
+ * If this method fails (by throwing an exception), the underlying data source may require manual
+ * cleanup.
+ *
+ * Unless the abort is triggered by the failure of commit, the given messages should have some
+ * null slots as there maybe only a few data writers that are committed before the abort
+ * happens, or some data writers were committed but their commit messages haven't reached the
+ * driver when the abort is triggered. So this is just a "best effort" for data sources to
+ * clean up the data left by data writers.
+ */
+ void abort(long epochId, WriterCommitMessage[] messages);
+
+ default void commit(WriterCommitMessage[] messages) {
+ throw new UnsupportedOperationException(
+ "Commit without epoch should not be called with StreamWriter");
+ }
+
+ default void abort(WriterCommitMessage[] messages) {
+ throw new UnsupportedOperationException(
+ "Abort without epoch should not be called with StreamWriter");
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
index 943d010..017a673 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression}
import org.apache.spark.sql.catalyst.plans.physical
-import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning}
+import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Partitioning}
/**
* An adapter from public data source partitioning to catalyst internal `Partitioning`.
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index ee08582..df469af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index c544adb..6592bd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
-import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
http://git-wip-us.apache.org/repos/asf/spark/blob/56ae3265/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 93572f7..d9aa857 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
-import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
+import org.apache.spark.sql.sources.v2.reader.MicroBatchReadSupport
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
+import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org