You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by xuanyuanking <gi...@git.apache.org> on 2017/12/06 09:38:48 UTC
[GitHub] spark pull request #18975: [SPARK-4131] Support "Writing data into the files...
Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/18975#discussion_r155185768
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources._
+
+/**
+ * A command used to write the result of a query to a directory.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * INSERT OVERWRITE DIRECTORY (path=STRING)?
+ * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
+ * SELECT ...
+ * }}}
+ *
+ * @param storage storage format used to describe how the query result is stored.
+ * @param provider the data source type to be used
+ * @param query the logical plan representing data to write to
+ * @param overwrite whthere overwrites existing directory
+ */
+case class InsertIntoDataSourceDirCommand(
+ storage: CatalogStorageFormat,
+ provider: String,
+ query: LogicalPlan,
+ overwrite: Boolean) extends RunnableCommand {
+
+ override def children: Seq[LogicalPlan] = Seq(query)
+
+ override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = {
+ assert(children.length == 1)
+ assert(storage.locationUri.nonEmpty, "Directory path is required")
+ assert(provider.nonEmpty, "Data source is required")
+
+ // Create the relation based on the input logical plan: `query`.
+ val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+
+ val dataSource = DataSource(
+ sparkSession,
+ className = provider,
+ options = storage.properties ++ pathOption,
+ catalogTable = None)
+
+ val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+ if (!isFileFormat) {
+ throw new SparkException(
+ "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass)
+ }
+
+ val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
+ try {
+ sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query))
+ dataSource.writeAndRead(saveMode, query)
--- End diff --
The implementation here confused me, just want to leave a question here why we should call both `writeAndRead` and `planForWriting`?
@janewangfb @gatorsmile @cloud-fan
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org