You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/23 14:40:26 UTC

[spark] branch master updated: [SPARK-27128][SQL] Migrate JSON to File Data Source V2

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 00f2f31  [SPARK-27128][SQL] Migrate JSON to File Data Source V2
00f2f31 is described below

commit 00f2f311f775761cbb5f3ff0b8091217080293c0
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Tue Apr 23 22:39:59 2019 +0800

    [SPARK-27128][SQL] Migrate JSON to File Data Source V2
    
    ## What changes were proposed in this pull request?
    Migrate JSON to File Data Source V2
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24058 from gengliangwang/jsonV2.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  2 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |  2 +-
 .../spark/sql/execution/command/tables.scala       |  4 +-
 .../datasources/json/JsonFileFormat.scala          | 35 ----------
 .../datasources/json/JsonOutputWriter.scala        | 63 +++++++++++++++++
 .../datasources/v2/json/JsonDataSourceV2.scala     | 44 ++++++++++++
 .../v2/json/JsonPartitionReaderFactory.scala       | 61 +++++++++++++++++
 .../execution/datasources/v2/json/JsonScan.scala   | 80 ++++++++++++++++++++++
 .../datasources/v2/json/JsonScanBuilder.scala      | 36 ++++++++++
 .../execution/datasources/v2/json/JsonTable.scala  | 73 ++++++++++++++++++++
 .../datasources/v2/json/JsonWriteBuilder.scala     | 63 +++++++++++++++++
 .../spark/sql/FileBasedDataSourceSuite.scala       |  2 +-
 .../execution/OptimizeMetadataOnlyQuerySuite.scala |  5 +-
 .../sql/execution/datasources/json/JsonSuite.scala |  4 +-
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 20 +++---
 .../spark/sql/sources/HadoopFsRelationTest.scala   |  8 ++-
 16 files changed, 445 insertions(+), 57 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9ebd2c0..3a2e736 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1494,7 +1494,7 @@ object SQLConf {
       " register class names for which data source V2 write paths are disabled. Writes from these" +
       " sources will fall back to the V1 sources.")
     .stringConf
-    .createWithDefault("csv,orc,text")
+    .createWithDefault("csv,json,orc,text")
 
   val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
     .doc("A comma-separated list of fully qualified data source register class names for which" +
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index d988287..12e9067 100644
--- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,6 +1,6 @@
 org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
 org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
-org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
 org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 395d61b..ea29eff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier}
 import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
-import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
+import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -238,7 +238,7 @@ case class AlterTableAddColumnsCommand(
         // TextFileFormat only default to one column "value"
         // Hive type is already considered as hive serde table, so the logic will not
         // come in here.
-        case _: JsonFileFormat | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 =>
+        case _: JsonDataSourceV2 | _: CSVDataSourceV2 | _: ParquetFileFormat | _: OrcDataSourceV2 =>
         case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
         case s =>
           throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index d3f0414..95a63c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -157,38 +157,3 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
     case _ => false
   }
 }
-
-private[json] class JsonOutputWriter(
-    path: String,
-    options: JSONOptions,
-    dataSchema: StructType,
-    context: TaskAttemptContext)
-  extends OutputWriter with Logging {
-
-  private val encoding = options.encoding match {
-    case Some(charsetName) => Charset.forName(charsetName)
-    case None => StandardCharsets.UTF_8
-  }
-
-  if (JSONOptionsInRead.blacklist.contains(encoding)) {
-    logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" +
-         " which can be read back by Spark only if multiLine is enabled.")
-  }
-
-  private var jacksonGenerator: Option[JacksonGenerator] = None
-
-  override def write(row: InternalRow): Unit = {
-    val gen = jacksonGenerator.getOrElse {
-      val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
-      // create the Generator without separator inserted between 2 records
-      val newGen = new JacksonGenerator(dataSchema, os, options)
-      jacksonGenerator = Some(newGen)
-      newGen
-    }
-
-    gen.write(row)
-    gen.writeLineEnding()
-  }
-
-  override def close(): Unit = jacksonGenerator.foreach(_.close())
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
new file mode 100644
index 0000000..b3cd570
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.json
+
+import java.nio.charset.{Charset, StandardCharsets}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions, JSONOptionsInRead}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
+import org.apache.spark.sql.types.StructType
+
+class JsonOutputWriter(
+    path: String,
+    options: JSONOptions,
+    dataSchema: StructType,
+    context: TaskAttemptContext)
+  extends OutputWriter with Logging {
+
+  private val encoding = options.encoding match {
+    case Some(charsetName) => Charset.forName(charsetName)
+    case None => StandardCharsets.UTF_8
+  }
+
+  if (JSONOptionsInRead.blacklist.contains(encoding)) {
+    logWarning(s"The JSON file ($path) was written in the encoding ${encoding.displayName()}" +
+      " which can be read back by Spark only if multiLine is enabled.")
+  }
+
+  private var jacksonGenerator: Option[JacksonGenerator] = None
+
+  override def write(row: InternalRow): Unit = {
+    val gen = jacksonGenerator.getOrElse {
+      val os = CodecStreams.createOutputStreamWriter(context, new Path(path), encoding)
+      // create the Generator without separator inserted between 2 records
+      val newGen = new JacksonGenerator(dataSchema, os, options)
+      jacksonGenerator = Some(newGen)
+      newGen
+    }
+
+    gen.write(row)
+    gen.writeLineEnding()
+  }
+
+  override def close(): Unit = jacksonGenerator.foreach(_.close())
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
new file mode 100644
index 0000000..610bd4c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonDataSourceV2.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.sources.v2.Table
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class JsonDataSourceV2 extends FileDataSourceV2 {
+
+  override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[JsonFileFormat]
+
+  override def shortName(): String = "json"
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    val paths = getPaths(options)
+    val tableName = getTableName(paths)
+    JsonTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+  }
+
+  override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
+    val paths = getPaths(options)
+    val tableName = getTableName(paths)
+    JsonTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+  }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
new file mode 100644
index 0000000..e5b7ae0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptionsInRead}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A factory used to create JSON readers.
+ *
+ * @param sqlConf SQL configuration.
+ * @param broadcastedConf Broadcast serializable Hadoop Configuration.
+ * @param dataSchema Schema of JSON files.
+ * @param readDataSchema Required schema of JSON files.
+ * @param partitionSchema Schema of partitions.
+ * @param parsedOptions Options for parsing JSON files.
+ */
+case class JsonPartitionReaderFactory(
+    sqlConf: SQLConf,
+    broadcastedConf: Broadcast[SerializableConfiguration],
+    dataSchema: StructType,
+    readDataSchema: StructType,
+    partitionSchema: StructType,
+    parsedOptions: JSONOptionsInRead) extends FilePartitionReaderFactory {
+
+  override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
+    val actualSchema =
+      StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
+    val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
+    val iter = JsonDataSource(parsedOptions).readFile(
+      broadcastedConf.value.value,
+      partitionedFile,
+      parser,
+      readDataSchema)
+    val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
+    new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
+      partitionSchema, partitionedFile.partitionValues)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
new file mode 100644
index 0000000..201572b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.SerializableConfiguration
+
+case class JsonScan(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    dataSchema: StructType,
+    readDataSchema: StructType,
+    readPartitionSchema: StructType,
+    options: CaseInsensitiveStringMap)
+  extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema, options) {
+
+  private val parsedOptions = new JSONOptionsInRead(
+    CaseInsensitiveMap(options.asScala.toMap),
+    sparkSession.sessionState.conf.sessionLocalTimeZone,
+    sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+  override def isSplitable(path: Path): Boolean = {
+    JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path)
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    // Check a field requirement for corrupt records here to throw an exception in a driver side
+    ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
+
+    if (readDataSchema.length == 1 &&
+      readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
+      throw new AnalysisException(
+        "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\n" +
+          "referenced columns only include the internal corrupt record column\n" +
+          s"(named _corrupt_record by default). For example:\n" +
+          "spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()\n" +
+          "and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().\n" +
+          "Instead, you can cache or save the parsed results and then send the same query.\n" +
+          "For example, val df = spark.read.schema(schema).json(file).cache() and then\n" +
+          "df.filter($\"_corrupt_record\".isNotNull).count()."
+      )
+    }
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case sensitive.
+    val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    val broadcastedConf = sparkSession.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf))
+    // The partition values are already truncated in `FileScan.partitions`.
+    // We should use `readPartitionSchema` as the partition schema here.
+    JsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
+      dataSchema, readDataSchema, readPartitionSchema, parsedOptions)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala
new file mode 100644
index 0000000..bb3c036
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class JsonScanBuilder (
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    schema: StructType,
+    dataSchema: StructType,
+    options: CaseInsensitiveStringMap)
+  extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+  override def build(): Scan = {
+    JsonScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
new file mode 100644
index 0000000..bbdd3ae
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class JsonTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat])
+  extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+  override def newScanBuilder(options: CaseInsensitiveStringMap): JsonScanBuilder =
+    new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+
+  override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
+    val parsedOptions = new JSONOptionsInRead(
+      options.asScala.toMap,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+    JsonDataSource(parsedOptions).inferSchema(
+      sparkSession, files, parsedOptions)
+  }
+
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
+    new JsonWriteBuilder(options, paths, formatName, supportsDataType)
+
+  override def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: AtomicType => true
+
+    case st: StructType => st.forall { f => supportsDataType(f.dataType) }
+
+    case ArrayType(elementType, _) => supportsDataType(elementType)
+
+    case MapType(keyType, valueType, _) =>
+      supportsDataType(keyType) && supportsDataType(valueType)
+
+    case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
+
+    case _: NullType => true
+
+    case _ => false
+  }
+
+  override def formatName: String = "JSON"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
new file mode 100644
index 0000000..3c99e07
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.json
+
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.catalyst.json.JSONOptions
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
+import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class JsonWriteBuilder(
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    formatName: String,
+    supportsDataType: DataType => Boolean)
+  extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
+  override def prepareWrite(
+      sqlConf: SQLConf,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val parsedOptions = new JSONOptions(
+      options,
+      sqlConf.sessionLocalTimeZone,
+      sqlConf.columnNameOfCorruptRecord)
+    parsedOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new JsonOutputWriter(path, parsedOptions, dataSchema, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".json" + CodecStreams.getCompressionExtension(context)
+      }
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 8fcffbf..c6fdf41 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -332,7 +332,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
       // TODO: test file source V2 after write path is fixed.
       Seq(true).foreach { useV1 =>
         val useV1List = if (useV1) {
-          "csv,orc"
+          "csv,json,orc"
         } else {
           ""
         }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index a543eb8..e20a82b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -130,7 +130,10 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
   }
 
   test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
-    withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+    // This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is disabled
+    // by default, we can skip testing file source v2 in current stage.
+    withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true",
+      SQLConf.USE_V1_SOURCE_READER_LIST.key -> "json") {
       withTempPath { path =>
         val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
         Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6976177..6316e89 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1961,9 +1961,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
         spark.read.schema(schema).json(path).select("_corrupt_record").collect()
       }.getMessage
       assert(msg.contains("only include the internal corrupt record column"))
-      intercept[catalyst.errors.TreeNodeException[_]] {
-        spark.read.schema(schema).json(path).filter($"_corrupt_record".isNotNull).count()
-      }
+
       // workaround
       val df = spark.read.schema(schema).json(path).cache()
       assert(df.filter($"_corrupt_record".isNotNull).count() == 1)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 6a64599..95a57aa 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -913,24 +913,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
     withTable("appendParquetToJson") {
       createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson")
-      val e = intercept[AnalysisException] {
+      val msg = intercept[AnalysisException] {
         createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
           .saveAsTable("appendParquetToJson")
-      }
-      assert(e.getMessage.contains(
-        "The format of the existing table default.appendParquetToJson is `JsonFileFormat`. " +
-        "It doesn't match the specified format `ParquetFileFormat`"))
+      }.getMessage
+      // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat.
+      assert(msg.contains("The format of the existing table default.appendParquetToJson is `Json"))
+      assert(msg.contains("It doesn't match the specified format `ParquetFileFormat`"))
     }
 
     withTable("appendTextToJson") {
       createDF(0, 9).write.format("json").saveAsTable("appendTextToJson")
-      val e = intercept[AnalysisException] {
+      val msg = intercept[AnalysisException] {
         createDF(10, 19).write.mode(SaveMode.Append).format("text")
           .saveAsTable("appendTextToJson")
-      }
-      assert(e.getMessage.contains(
-        "The format of the existing table default.appendTextToJson is `JsonFileFormat`. " +
-        "It doesn't match the specified format"))
+      }.getMessage
+      // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat.
+      assert(msg.contains("The format of the existing table default.appendTextToJson is `Json"))
+      assert(msg.contains("It doesn't match the specified format"))
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index bf6d0ea..0587cfe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -817,10 +817,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
         assert(preferredLocations.distinct.length == 2)
       }
 
-      checkLocality()
-
-      withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+      withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> dataSourceName) {
         checkLocality()
+
+        withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {
+          checkLocality()
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org