You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by GitBox <gi...@apache.org> on 2019/01/15 21:56:52 UTC
[spark] Diff for: [GitHub] asfgit closed pull request #23208:
[SPARK-25530][SQL] data source v2 API refactor (batch write)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
deleted file mode 100644
index df439e2c02fe3..0000000000000
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2;
-
-import java.util.Optional;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.SaveMode;
-import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
- * provide data writing ability for batch processing.
- *
- * This interface is used to create {@link BatchWriteSupport} instances when end users run
- * {@code Dataset.write.format(...).option(...).save()}.
- */
-@Evolving
-public interface BatchWriteSupportProvider extends DataSourceV2 {
-
- /**
- * Creates an optional {@link BatchWriteSupport} instance to save the data to this data source,
- * which is called by Spark at the beginning of each batch query.
- *
- * Data sources can return None if there is no writing needed to be done according to the save
- * mode.
- *
- * @param queryId A unique string for the writing query. It's possible that there are many
- * writing queries running at the same time, and the returned
- * {@link BatchWriteSupport} can use this id to distinguish itself from others.
- * @param schema the schema of the data to be written.
- * @param mode the save mode which determines what to do when the data are already in this data
- * source, please refer to {@link SaveMode} for more details.
- * @param options the options for the returned data source writer, which is an immutable
- * case-insensitive string-to-string map.
- * @return a write support to write data to this data source.
- */
- Optional<BatchWriteSupport> createBatchWriteSupport(
- String queryId,
- StructType schema,
- SaveMode mode,
- DataSourceOptions options);
-}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
index eae7a45d1d446..4aaa57dd4db9d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java
@@ -20,15 +20,7 @@
import org.apache.spark.annotation.Evolving;
/**
- * The base interface for data source v2. Implementations must have a public, 0-arg constructor.
- *
- * Note that this is an empty interface. Data source implementations must mix in interfaces such as
- * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide
- * batch or streaming read/write support instances. Otherwise it's just a dummy data source which
- * is un-readable/writable.
- *
- * If Spark fails to execute any methods in the implementations of this interface (by throwing an
- * exception), the read action will fail and no Spark job will be submitted.
+ * TODO: remove it when we finish the API refactor for streaming side.
*/
@Evolving
public interface DataSourceV2 {}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
new file mode 100644
index 0000000000000..08caadd5308e6
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+
+/**
+ * An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
+ * <p>
+ * If a {@link Table} implements this interface, the
+ * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
+ * with {@link WriteBuilder#buildForBatch()} implemented.
+ * </p>
+ */
+@Evolving
+public interface SupportsBatchWrite extends SupportsWrite {}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
index e22738d20d507..5031c71c0fd4d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java
@@ -29,7 +29,10 @@
/**
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this
- * method to configure each scan.
+ * method to configure each data source scan.
+ *
+ * @param options The options for reading, which is an immutable case-insensitive
+ * string-to-string map.
*/
ScanBuilder newScanBuilder(DataSourceOptions options);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
new file mode 100644
index 0000000000000..ecdfe20730254
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.writer.BatchWrite;
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder;
+
+/**
+ * An internal base interface of mix-in interfaces for writable {@link Table}. This adds
+ * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
+ * for batch or streaming.
+ */
+interface SupportsWrite extends Table {
+
+ /**
+ * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
+ * this method to configure each data source write.
+ */
+ WriteBuilder newWriteBuilder(DataSourceOptions options);
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java
similarity index 99%
rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java
index efe1ac4f78db1..91297759971b5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java
@@ -38,7 +38,7 @@
* Please refer to the documentation of commit/abort methods for detailed specifications.
*/
@Evolving
-public interface BatchWriteSupport {
+public interface BatchWrite {
/**
* Creates a writer factory which will be serialized and sent to executors.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index d142ee523ef9f..11228ad1ea672 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -36,11 +36,11 @@
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
- * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data
+ * {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
* In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a
- * different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])}
+ * different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])}
* when the configured number of retries is exhausted.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
@@ -71,11 +71,11 @@
/**
* Commits this writer after all records are written successfully, returns a commit message which
* will be sent back to driver side and passed to
- * {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
+ * {@link BatchWrite#commit(WriterCommitMessage[])}.
*
* The written data should only be visible to data source readers after
- * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method
- * should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to
+ * {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method
+ * should still "hide" the written data and ask the {@link BatchWrite} at driver side to
* do the final commit via {@link WriterCommitMessage}.
*
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
@@ -93,7 +93,7 @@
* failed.
*
* If this method fails(by throwing an exception), the underlying data source may have garbage
- * that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
+ * that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually,
* but these garbage should not be visible to data source readers.
*
* @throws IOException if failure happens during disk/network IO like writing files.
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index 65105f46b82d5..bf2db9059b088 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -24,7 +24,7 @@
import org.apache.spark.sql.catalyst.InternalRow;
/**
- * A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()},
+ * A factory of {@link DataWriter} returned by {@link BatchWrite#createBatchWriterFactory()},
* which is responsible for creating and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
new file mode 100644
index 0000000000000..c4295f2371877
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.sql.SaveMode;
+
+// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before
+// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details.
+public interface SupportsSaveMode extends WriteBuilder {
+ WriteBuilder mode(SaveMode mode);
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
new file mode 100644
index 0000000000000..e861c72af9e68
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
+ * support different ways to write data to data sources.
+ *
+ * Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to
+ * append data without affecting existing data.
+ */
+@Evolving
+public interface WriteBuilder {
+
+ /**
+ * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
+ * possible that there are many queries running at the same time, or a query is restarted and
+ * resumed. {@link BatchWrite} can use this id to identify the query.
+ *
+ * @return a new builder with the `queryId`. By default it returns `this`, which means the given
+ * `queryId` is ignored. Please override this method to take the `queryId`.
+ */
+ default WriteBuilder withQueryId(String queryId) {
+ return this;
+ }
+
+ /**
+ * Passes the schema of the input data from Spark to data source.
+ *
+ * @return a new builder with the `schema`. By default it returns `this`, which means the given
+ * `schema` is ignored. Please override this method to take the `schema`.
+ */
+ default WriteBuilder withInputDataSchema(StructType schema) {
+ return this;
+ }
+
+ /**
+ * Returns a {@link BatchWrite} to write data to batch source. By default this method throws
+ * exception, data sources must overwrite this method to provide an implementation, if the
+ * {@link Table} that creates this scan implements {@link SupportsBatchWrite}.
+ *
+ * Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
+ * to indicate that no writing is needed. We can clean it up after removing
+ * {@link SupportsSaveMode}.
+ */
+ default BatchWrite buildForBatch() {
+ throw new UnsupportedOperationException("Batch scans are not supported");
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 9216e34399092..6334c8f643098 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -24,7 +24,7 @@
/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
- * as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or
+ * as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
* {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it when
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index ce8e4c8f5b82b..af369a5bca464 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2._
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
case _ => provider.getTable(dsOptions)
}
table match {
- case s: SupportsBatchRead =>
- Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
- provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema))
-
+ case _: SupportsBatchRead =>
+ Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
case _ => loadV1Source(paths: _*)
}
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 981b3a8fd4ac1..228dcb94b9acc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
import org.apache.spark.sql.types.StructType
/**
@@ -241,33 +242,38 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save")
- val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
- if (classOf[DataSourceV2].isAssignableFrom(cls)) {
- val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
- source match {
- case provider: BatchWriteSupportProvider =>
- val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
- source,
- df.sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
-
+ val session = df.sparkSession
+ val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
+ if (classOf[TableProvider].isAssignableFrom(cls)) {
+ val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ provider, session.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val dsOptions = new DataSourceOptions(options.asJava)
+ provider.getTable(dsOptions) match {
+ case table: SupportsBatchWrite =>
if (mode == SaveMode.Append) {
- val relation = DataSourceV2Relation.createRelationForWrite(source, options)
+ val relation = DataSourceV2Relation.create(table, options)
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
}
-
} else {
- val writer = provider.createBatchWriteSupport(
- UUID.randomUUID().toString,
- df.logicalPlan.output.toStructType,
- mode,
- new DataSourceOptions(options.asJava))
-
- if (writer.isPresent) {
- runCommand(df.sparkSession, "save") {
- WriteToDataSourceV2(writer.get, df.logicalPlan)
- }
+ val writeBuilder = table.newWriteBuilder(dsOptions)
+ .withQueryId(UUID.randomUUID().toString)
+ .withInputDataSchema(df.logicalPlan.schema)
+ writeBuilder match {
+ case s: SupportsSaveMode =>
+ val write = s.mode(mode).buildForBatch()
+ // It can only return null with `SupportsSaveMode`. We can clean it up after
+ // removing `SupportsSaveMode`.
+ if (write != null) {
+ runCommand(df.sparkSession, "save") {
+ WriteToDataSourceV2(write, df.logicalPlan)
+ }
+ }
+
+ case _ => throw new AnalysisException(
+ s"data source ${table.name} does not support SaveMode $mode")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7bf2b8bff3732..6321578184346 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -21,46 +21,51 @@ import java.util.UUID
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
+import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.StructType
/**
- * A logical plan representing a data source v2 scan.
+ * A logical plan representing a data source v2 table.
*
- * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param table The table that this relation represents.
+ * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
+ * and [[WriteBuilder]].
*/
case class DataSourceV2Relation(
- // TODO: remove `source` when we finish API refactor for write.
- source: TableProvider,
- table: SupportsBatchRead,
+ table: Table,
output: Seq[AttributeReference],
- options: Map[String, String],
- userSpecifiedSchema: Option[StructType] = None)
+ options: Map[String, String])
extends LeafNode with MultiInstanceRelation with NamedRelation {
- import DataSourceV2Relation._
-
override def name: String = table.name()
override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}
- def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema)
+ def newScanBuilder(): ScanBuilder = table match {
+ case s: SupportsBatchRead =>
+ val dsOptions = new DataSourceOptions(options.asJava)
+ s.newScanBuilder(dsOptions)
+ case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}")
+ }
+
- def newScanBuilder(): ScanBuilder = {
- val dsOptions = new DataSourceOptions(options.asJava)
- table.newScanBuilder(dsOptions)
+
+ def newWriteBuilder(schema: StructType): WriteBuilder = table match {
+ case s: SupportsBatchWrite =>
+ val dsOptions = new DataSourceOptions(options.asJava)
+ s.newWriteBuilder(dsOptions)
+ .withQueryId(UUID.randomUUID().toString)
+ .withInputDataSchema(schema)
+ case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}")
}
override def computeStats(): Statistics = {
@@ -126,52 +131,8 @@ case class StreamingDataSourceV2Relation(
}
object DataSourceV2Relation {
- private implicit class SourceHelpers(source: DataSourceV2) {
- def asWriteSupportProvider: BatchWriteSupportProvider = {
- source match {
- case provider: BatchWriteSupportProvider =>
- provider
- case _ =>
- throw new AnalysisException(s"Data source is not writable: $name")
- }
- }
-
- def name: String = {
- source match {
- case registered: DataSourceRegister =>
- registered.shortName()
- case _ =>
- source.getClass.getSimpleName
- }
- }
-
- def createWriteSupport(
- options: Map[String, String],
- schema: StructType): BatchWriteSupport = {
- asWriteSupportProvider.createBatchWriteSupport(
- UUID.randomUUID().toString,
- schema,
- SaveMode.Append,
- new DataSourceOptions(options.asJava)).get
- }
- }
-
- def create(
- provider: TableProvider,
- table: SupportsBatchRead,
- options: Map[String, String],
- userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
+ def create(table: Table, options: Map[String, String]): DataSourceV2Relation = {
val output = table.schema().toAttributes
- DataSourceV2Relation(provider, table, output, options, userSpecifiedSchema)
- }
-
- // TODO: remove this when we finish API refactor for write.
- def createRelationForWrite(
- source: DataSourceV2,
- options: Map[String, String]): DataSourceV2Relation = {
- val provider = source.asInstanceOf[TableProvider]
- val dsOptions = new DataSourceOptions(options.asJava)
- val table = provider.getTable(dsOptions)
- create(provider, table.asInstanceOf[SupportsBatchRead], options)
+ DataSourceV2Relation(table, output, options)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 2e26fce880b68..79540b0246214 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable
-import org.apache.spark.sql.{sources, Strategy}
+import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition}
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
object DataSourceV2Strategy extends Strategy {
@@ -110,7 +111,7 @@ object DataSourceV2Strategy extends Strategy {
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
logInfo(
s"""
- |Pushing operators to ${relation.source.getClass}
+ |Pushing operators to ${relation.name}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
@@ -136,7 +137,14 @@ object DataSourceV2Strategy extends Strategy {
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
case AppendData(r: DataSourceV2Relation, query, _) =>
- WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil
+ val writeBuilder = r.newWriteBuilder(query.schema)
+ writeBuilder match {
+ case s: SupportsSaveMode =>
+ val write = s.mode(SaveMode.Append).buildForBatch()
+ assert(write != null)
+ WriteToDataSourceV2Exec(write, planLater(query)) :: Nil
+ case _ => throw new AnalysisException(s"data source ${r.name} does not support SaveMode")
+ }
case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index d7e20eed4cbc0..406fb8c3a3834 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{LongAccumulator, Utils}
* specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
*/
@deprecated("Use specific logical plans like AppendData instead", "2.4.0")
-case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPlan)
+case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
@@ -44,7 +44,7 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl
/**
* The physical plan for writing data into data source v2.
*/
-case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan)
+case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan)
extends UnaryExecNode {
var commitProgress: Option[StreamWriterCommitProgress] = None
@@ -53,13 +53,13 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
override def output: Seq[Attribute] = Nil
override protected def doExecute(): RDD[InternalRow] = {
- val writerFactory = writeSupport.createBatchWriterFactory()
- val useCommitCoordinator = writeSupport.useCommitCoordinator
+ val writerFactory = batchWrite.createBatchWriterFactory()
+ val useCommitCoordinator = batchWrite.useCommitCoordinator
val rdd = query.execute()
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
val totalNumRowsAccumulator = new LongAccumulator()
- logInfo(s"Start processing data source write support: $writeSupport. " +
+ logInfo(s"Start processing data source write support: $batchWrite. " +
s"The input RDD has ${messages.length} partitions.")
try {
@@ -72,26 +72,26 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark
val commitMessage = result.writerCommitMessage
messages(index) = commitMessage
totalNumRowsAccumulator.add(result.numRows)
- writeSupport.onDataWriterCommit(commitMessage)
+ batchWrite.onDataWriterCommit(commitMessage)
}
)
- logInfo(s"Data source write support $writeSupport is committing.")
- writeSupport.commit(messages)
- logInfo(s"Data source write support $writeSupport committed.")
+ logInfo(s"Data source write support $batchWrite is committing.")
+ batchWrite.commit(messages)
+ logInfo(s"Data source write support $batchWrite committed.")
commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
} catch {
case cause: Throwable =>
- logError(s"Data source write support $writeSupport is aborting.")
+ logError(s"Data source write support $batchWrite is aborting.")
try {
- writeSupport.abort(messages)
+ batchWrite.abort(messages)
} catch {
case t: Throwable =>
- logError(s"Data source write support $writeSupport failed to abort.")
+ logError(s"Data source write support $batchWrite failed to abort.")
cause.addSuppressed(t)
throw new SparkException("Writing job failed.", cause)
}
- logError(s"Data source write support $writeSupport aborted.")
+ logError(s"Data source write support $batchWrite aborted.")
cause match {
// Only wrap non fatal exceptions.
case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 38ecb0dd12daa..db1bf32a156c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
+import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchReadSupport}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
@@ -515,7 +515,7 @@ class MicroBatchExecution(
newAttributePlan.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
- WriteToDataSourceV2(new MicroBatchWritSupport(currentBatchId, writer), newAttributePlan)
+ WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
similarity index 84%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
index 9f88416871f8e..143235efee81d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala
@@ -18,16 +18,15 @@
package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
/**
- * A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements
+ * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements
* the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped
* streaming write support.
*/
-class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport)
- extends BatchWriteSupport {
+class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWrite {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
writeSupport.commit(eppchId, messages)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
index ac3c71cc222b1..fd4cb444ce580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
@@ -21,12 +21,12 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingDataWriterFactory
/**
* A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery
- * to a [[BatchWriteSupport]] on the driver.
+ * to a [[BatchWrite]] on the driver.
*
* Note that, because it sends all rows to the driver, this factory will generally be unsuitable
* for production-quality sinks. It's intended for use in tests.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index d282193d35d76..c60ea4a2f9f5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -329,8 +329,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
.format(classOf[DataSourceV2WithSessionConfig].getName).load()
val options = df.queryExecution.optimizedPlan.collectFirst {
case d: DataSourceV2Relation => d.options
- }
- assert(options.get.get(optionName) == Some("false"))
+ }.get
+ assert(options.get(optionName).get == "false")
}
}
@@ -356,13 +356,11 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
val cls = classOf[SimpleWriteOnlyDataSource]
val path = file.getCanonicalPath
val df = spark.range(5).select('id as 'i, -'id as 'j)
- try {
- df.write.format(cls.getName).option("path", path).mode("error").save()
- df.write.format(cls.getName).option("path", path).mode("overwrite").save()
- df.write.format(cls.getName).option("path", path).mode("ignore").save()
- } catch {
- case e: SchemaReadAttemptException => fail("Schema read was attempted.", e)
- }
+ // non-append mode should not throw exception, as they don't access schema.
+ df.write.format(cls.getName).option("path", path).mode("error").save()
+ df.write.format(cls.getName).option("path", path).mode("overwrite").save()
+ df.write.format(cls.getName).option("path", path).mode("ignore").save()
+ // append mode will access schema and should throw exception.
intercept[SchemaReadAttemptException] {
df.write.format(cls.getName).option("path", path).mode("append").save()
}
@@ -680,10 +678,12 @@ object SpecificReaderFactory extends PartitionReaderFactory {
class SchemaReadAttemptException(m: String) extends RuntimeException(m)
class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
- override def writeSchema(): StructType = {
- // This is a bit hacky since this source implements read support but throws
- // during schema retrieval. Might have to rewrite but it's done
- // such so for minimised changes.
- throw new SchemaReadAttemptException("read is not supported")
+
+ override def getTable(options: DataSourceOptions): Table = {
+ new MyTable(options) {
+ override def schema(): StructType = {
+ throw new SchemaReadAttemptException("schema should not be read.")
+ }
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 82bb4fa33a3ae..6e4f2bbcd6b61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.sources.v2
import java.io.{BufferedReader, InputStreamReader, IOException}
-import java.util.Optional
import scala.collection.JavaConverters._
@@ -35,15 +34,13 @@ import org.apache.spark.util.SerializableConfiguration
/**
* A HDFS based transactional writable data source.
- * Each task writes data to `target/_temporary/queryId/$jobId-$partitionId-$attemptNumber`.
- * Each job moves files from `target/_temporary/queryId/` to `target`.
+ * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`.
+ * Each job moves files from `target/_temporary/uniqueId/` to `target`.
*/
class SimpleWritableDataSource extends DataSourceV2
- with TableProvider
- with BatchWriteSupportProvider
- with SessionConfigSupport {
+ with TableProvider with SessionConfigSupport {
- protected def writeSchema(): StructType = new StructType().add("i", "long").add("j", "long")
+ private val tableSchema = new StructType().add("i", "long").add("j", "long")
override def keyPrefix: String = "simpleWritableDataSource"
@@ -68,22 +65,50 @@ class SimpleWritableDataSource extends DataSourceV2
new CSVReaderFactory(serializableConf)
}
- override def readSchema(): StructType = writeSchema
+ override def readSchema(): StructType = tableSchema
}
- override def getTable(options: DataSourceOptions): Table = {
- val path = new Path(options.get("path").get())
- val conf = SparkContext.getActive.get.hadoopConfiguration
- new SimpleBatchTable {
- override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
- new MyScanBuilder(path.toUri.toString, conf)
+ class MyWriteBuilder(path: String) extends WriteBuilder with SupportsSaveMode {
+ private var queryId: String = _
+ private var mode: SaveMode = _
+
+ override def withQueryId(queryId: String): WriteBuilder = {
+ this.queryId = queryId
+ this
+ }
+
+ override def mode(mode: SaveMode): WriteBuilder = {
+ this.mode = mode
+ this
+ }
+
+ override def buildForBatch(): BatchWrite = {
+ assert(mode != null)
+
+ val hadoopPath = new Path(path)
+ val hadoopConf = SparkContext.getActive.get.hadoopConfiguration
+ val fs = hadoopPath.getFileSystem(hadoopConf)
+
+ if (mode == SaveMode.ErrorIfExists) {
+ if (fs.exists(hadoopPath)) {
+ throw new RuntimeException("data already exists.")
+ }
+ }
+ if (mode == SaveMode.Ignore) {
+ if (fs.exists(hadoopPath)) {
+ return null
+ }
+ }
+ if (mode == SaveMode.Overwrite) {
+ fs.delete(hadoopPath, true)
}
- override def schema(): StructType = writeSchema
+ val pathStr = hadoopPath.toUri.toString
+ new MyBatchWrite(queryId, pathStr, hadoopConf)
}
}
- class WritSupport(queryId: String, path: String, conf: Configuration) extends BatchWriteSupport {
+ class MyBatchWrite(queryId: String, path: String, conf: Configuration) extends BatchWrite {
override def createBatchWriterFactory(): DataWriterFactory = {
SimpleCounter.resetCounter
new CSVDataWriterFactory(path, queryId, new SerializableConfiguration(conf))
@@ -116,33 +141,23 @@ class SimpleWritableDataSource extends DataSourceV2
}
}
- override def createBatchWriteSupport(
- queryId: String,
- schema: StructType,
- mode: SaveMode,
- options: DataSourceOptions): Optional[BatchWriteSupport] = {
- assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))
+ class MyTable(options: DataSourceOptions) extends SimpleBatchTable with SupportsBatchWrite {
+ private val path = options.get("path").get()
+ private val conf = SparkContext.getActive.get.hadoopConfiguration
- val path = new Path(options.get("path").get())
- val conf = SparkContext.getActive.get.hadoopConfiguration
- val fs = path.getFileSystem(conf)
+ override def schema(): StructType = tableSchema
- if (mode == SaveMode.ErrorIfExists) {
- if (fs.exists(path)) {
- throw new RuntimeException("data already exists.")
- }
+ override def newScanBuilder(options: DataSourceOptions): ScanBuilder = {
+ new MyScanBuilder(new Path(path).toUri.toString, conf)
}
- if (mode == SaveMode.Ignore) {
- if (fs.exists(path)) {
- return Optional.empty()
- }
- }
- if (mode == SaveMode.Overwrite) {
- fs.delete(path, true)
+
+ override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
+ new MyWriteBuilder(path)
}
+ }
- val pathStr = path.toUri.toString
- Optional.of(new WritSupport(queryId, pathStr, conf))
+ override def getTable(options: DataSourceOptions): Table = {
+ new MyTable(options)
}
}
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org