You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/23 14:40:26 UTC
[spark] branch master updated: [SPARK-27128][SQL] Migrate JSON to
File Data Source V2
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 00f2f31 [SPARK-27128][SQL] Migrate JSON to File Data Source V2
00f2f31 is described below
commit 00f2f311f775761cbb5f3ff0b8091217080293c0
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Tue Apr 23 22:39:59 2019 +0800
[SPARK-27128][SQL] Migrate JSON to File Data Source V2
## What changes were proposed in this pull request?
Migrate JSON to File Data Source V2
## How was this patch tested?
Unit test
Closes #24058 from gengliangwang/jsonV2.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 2 +-
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../spark/sql/execution/command/tables.scala | 4 +-
.../datasources/json/JsonFileFormat.scala | 35 ----------
.../datasources/json/JsonOutputWriter.scala | 63 +++++++++++++++++
.../datasources/v2/json/JsonDataSourceV2.scala | 44 ++++++++++++
.../v2/json/JsonPartitionReaderFactory.scala | 61 +++++++++++++++++
.../execution/datasources/v2/json/JsonScan.scala | 80 ++++++++++++++++++++++
.../datasources/v2/json/JsonScanBuilder.scala | 36 ++++++++++
.../execution/datasources/v2/json/JsonTable.scala | 73 ++++++++++++++++++++
.../datasources/v2/json/JsonWriteBuilder.scala | 63 +++++++++++++++++
.../spark/sql/FileBasedDataSourceSuite.scala | 2 +-
.../execution/OptimizeMetadataOnlyQuerySuite.scala | 5 +-
.../sql/execution/datasources/json/JsonSuite.scala | 4 +-
.../spark/sql/hive/MetastoreDataSourcesSuite.scala | 20 +++---
.../spark/sql/sources/HadoopFsRelationTest.scala | 8 ++-
16 files changed, 445 insertions(+), 57 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9ebd2c0..3a2e736 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1494,7 +1494,7 @@ object SQLConf {
" register class names for which data source V2 write paths are disabled. Writes from these" +
" sources will fall back to the V1 sources.")
.stringConf
- .createWithDefault("csv,orc,text")
+ .createWithDefault("csv,json,orc,text")
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class names for which" +
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index d988287..12e9067 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,6 +1,6 @@
org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
-org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
org.apache.spark.sql.execution.datasources.noop.NoopDataSource
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 395d61b..ea29eff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
-import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
+import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -238,7 +238,7 @@ case class AlterTableAddColumnsCommand(
// TextFileFormat only default to one column "value"
// Hive type is already considered as hive serde table, so the logic will not
// come in here.
- case _: JsonFileFormat | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 =>
+ case _: JsonDataSourceV2 | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 =>
case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
case s =>
throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index d3f0414..95a63c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -157,38 +157,3 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
case _ => false
}
}
-
-private[json] class JsonOutputWriter(
- path: String,
- options: JSONOptions,
- dataSchema: StructType,
- context: TaskAttemptContext)
- extends OutputWriter with Logging {
-
- private val encoding = options.encoding match {
- case Some(charsetName) => Charset.forName(charsetName)
- case None => StandardCharsets.UTF_8
- }
-
- if (JSONOptionsInRead.blacklist.contains(encoding)) {
- logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" +
- " which can be read back by Spark only if multiLine is enabled.")
- }
-
- private var jacksonGenerator: Option[JacksonGenerator] = None
-
- override def write(row: InternalRow): Unit = {
- val gen = jacksonGenerator.getOrElse {
- val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
- // create the Generator without separator inserted between 2 records
- val newGen = new JacksonGenerator(dataSchema, os, options)
- jacksonGenerator = Some(newGen)
- newGen
- }
-
- gen.write(row)
- gen.writeLineEnding()
- }
-
- override def close(): Unit = jacksonGenerator.foreach(_.close())
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
new file mode 100644
index 0000000..b3cd570
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.json
+
+import java.nio.charset.{Charset, StandardCharsets}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions, JSONOptionsInRead}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
+import org.apache.spark.sql.types.StructType
+
+class JsonOutputWriter(
+ path: String,
+ options: JSONOptions,
+ dataSchema: StructType,
+ context: TaskAttemptContext)
+ extends OutputWriter with Logging {
+
+ private val encoding = options.encoding match {
+ case Some(charsetName) => Charset.forName(charsetName)
+ case None => StandardCharsets.UTF_8
+ }
+
+ if (JSONOptionsInRead.blacklist.contains(encoding)) {
+ logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" +
+ " which can be read back by Spark only if multiLine is enabled.")
+ }
+
+ private var jacksonGenerator: Option[JacksonGenerator] = None
+
+ override def write(row: InternalRow): Unit = {
+ val gen = jacksonGenerator.getOrElse {
+ val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
+ // create the Generator without separator inserted between 2 records
+ val newGen = new JacksonGenerator(dataSchema, os, options)
+ jacksonGenerator = Some(newGen)
+ newGen
+ }
+
+ gen.write(row)
+ gen.writeLineEnding()
+ }
+
+ override def close(): Unit = jacksonGenerator.foreach(_.close())
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
new file mode 100644
index 0000000..610bd4c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.sources.v2.Table
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class JsonDataSourceV2 extends FileDataSourceV2 {
+
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[JsonFileFormat]
+
+ override def shortName(): String = "json"
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+ }
+
+ override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+ }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
new file mode 100644
index 0000000..e5b7ae0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptionsInRead}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A factory used to create JSON readers.
+ *
+ * @param sqlConf SQL configuration.
+ * @param broadcastedConf Broadcast serializable Hadoop Configuration.
+ * @param dataSchema Schema of JSON files.
+ * @param readDataSchema Required schema of JSON files.
+ * @param partitionSchema Schema of partitions.
+ * @param parsedOptions Options for parsing JSON files.
+ */
+case class JsonPartitionReaderFactory(
+ sqlConf: SQLConf,
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ partitionSchema: StructType,
+ parsedOptions: JSONOptionsInRead) extends FilePartitionReaderFactory {
+
+ override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
+ val actualSchema =
+ StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
+ val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
+ val iter = JsonDataSource(parsedOptions).readFile(
+ broadcastedConf.value.value,
+ partitionedFile,
+ parser,
+ readDataSchema)
+ val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
+ new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
+ partitionSchema, partitionedFile.partitionValues)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
new file mode 100644
index 0000000..201572b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.SerializableConfiguration
+
+case class JsonScan(
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ readPartitionSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
+
+ private val parsedOptions = new JSONOptionsInRead(
+ CaseInsensitiveMap(options.asScala.toMap),
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+ override def isSplitable(path: Path): Boolean = {
+ JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path)
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ // Check a field requirement for corrupt records here to throw an exception in a driver side
+ ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
+
+ if (readDataSchema.length == 1 &&
+ readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+ throw new AnalysisException(
+ "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
+ "referenced columns only include the internal corrupt record column\n" +
+ s"(named _corrupt_record by default). For example:\n" +
+ "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
+ "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
+ "Instead, you can cache or save the parsed results and then send the same query.\n" +
+ "For example, val df = spark.read.schema(schema).json(file).cache() and then\n" +
+ "df.filter($\"_corrupt_record\".isNotNull).count()."
+ )
+ }
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ val broadcastedConf = sparkSession.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf))
+ // The partition values are already truncated in `FileScan.partitions`.
+ // We should use `readPartitionSchema` as the partition schema here.
+ JsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
+ dataSchema, readDataSchema, readPartitionSchema, parsedOptions)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala
new file mode 100644
index 0000000..bb3c036
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class JsonScanBuilder (
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ schema: StructType,
+ dataSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ override def build(): Scan = {
+ JsonScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
new file mode 100644
index 0000000..bbdd3ae
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class JsonTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat])
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): JsonScanBuilder =
+ new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+
+ override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
+ val parsedOptions = new JSONOptionsInRead(
+ options.asScala.toMap,
+ sparkSession.sessionState.conf.sessionLocalTimeZone,
+ sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+ JsonDataSource(parsedOptions).inferSchema(
+ sparkSession, files, parsedOptions)
+ }
+
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
+ new JsonWriteBuilder(options, paths, formatName, supportsDataType)
+
+ override def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: AtomicType => true
+
+ case st: StructType => st.forall { f => supportsDataType(f.dataType) }
+
+ case ArrayType(elementType, _) => supportsDataType(elementType)
+
+ case MapType(keyType, valueType, _) =>
+ supportsDataType(keyType) && supportsDataType(valueType)
+
+ case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
+
+ case _: NullType => true
+
+ case _ => false
+ }
+
+ override def formatName: String = "JSON"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
new file mode 100644
index 0000000..3c99e07
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
+import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class JsonWriteBuilder(
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ formatName: String,
+ supportsDataType: DataType => Boolean)
+ extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
+ override def prepareWrite(
+ sqlConf: SQLConf,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ val parsedOptions = new JSONOptions(
+ options,
+ sqlConf.sessionLocalTimeZone,
+ sqlConf.columnNameOfCorruptRecord)
+ parsedOptions.compressionCodec.foreach { codec =>
+ CompressionCodecs.setCodecConfiguration(conf, codec)
+ }
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new JsonOutputWriter(path, parsedOptions, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ ".json" + CodecStreams.getCompressionExtension(context)
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 8fcffbf..c6fdf41 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -332,7 +332,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
// TODO: test file source V2 after write path is fixed.
Seq(true).foreach { useV1 =>
val useV1List = if (useV1) {
- "csv,orc"
+ "csv,json,orc"
} else {
""
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index a543eb8..e20a82b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -130,7 +130,10 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
}
test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
- withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+ // This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is disabled
+ // by default, we can skip testing file source v2 in current stage.
+ withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true",
+ SQLConf.USE_V1_SOURCE_READER_LIST.key -> "json") {
withTempPath { path =>
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6976177..6316e89 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1961,9 +1961,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
spark.read.schema(schema).json(path).select("_corrupt_record").collect()
}.getMessage
assert(msg.contains("only include the internal corrupt record column"))
- intercept[catalyst.errors.TreeNodeException[_]] {
- spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
- }
+
// workaround
val df = spark.read.schema(schema).json(path).cache()
assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 6a64599..95a57aa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -913,24 +913,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable("appendParquetToJson") {
createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson")
- val e = intercept[AnalysisException] {
+ val msg = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
.saveAsTable("appendParquetToJson")
- }
- assert(e.getMessage.contains(
- "The format of the existing table default.appendParquetToJson is `JsonFileFormat`. " +
- "It doesn't match the specified format `ParquetFileFormat`"))
+ }.getMessage
+ // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat.
+ assert(msg.contains("The format of the existing table default.appendParquetToJson is `Json"))
+ assert(msg.contains("It doesn't match the specified format `ParquetFileFormat`"))
}
withTable("appendTextToJson") {
createDF(0, 9).write.format("json").saveAsTable("appendTextToJson")
- val e = intercept[AnalysisException] {
+ val msg = intercept[AnalysisException] {
createDF(10, 19).write.mode(SaveMode.Append).format("text")
.saveAsTable("appendTextToJson")
- }
- assert(e.getMessage.contains(
- "The format of the existing table default.appendTextToJson is `JsonFileFormat`. " +
- "It doesn't match the specified format"))
+ }.getMessage
+ // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat.
+ assert(msg.contains("The format of the existing table default.appendTextToJson is `Json"))
+ assert(msg.contains("It doesn't match the specified format"))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index bf6d0ea..0587cfe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -817,10 +817,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
assert(preferredLocations.distinct.length == 2)
}
- checkLocality()
-
- withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> dataSourceName) {
checkLocality()
+
+ withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+ checkLocality()
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org