You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HyukjinKwon (via GitHub)" <gi...@apache.org> on 2023/08/09 00:36:54 UTC

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41832: [SPARK-44265][SQL] Built-in XML data source support

HyukjinKwon commented on code in PR #41832:
URL: https://github.com/apache/spark/pull/41832#discussion_r1287807929


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlReader.scala:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.util.FailFastMode
+import org.apache.spark.sql.execution.datasources.xml.util.XmlFile
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A collection of static functions for working with XML files in Spark SQL
+ */
+class XmlReader(private var schema: StructType,

Review Comment:
   Actually I think we can remove the whole file but can be done separately in a followup.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/functions.scala:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Support functions for working with XML columns directly.
+ */
+// scalastyle:off: object.name
+object functions {

Review Comment:
   As a followup, we should move this function to `org.apache.spark.sql.functions`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/util/InferSchema.scala:
##########
@@ -0,0 +1,336 @@
+/*

Review Comment:
   Would have to be placed together with CSVInferSchema at catalyst.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/util/InferSchema.scala:
##########
@@ -0,0 +1,336 @@
+/*

Review Comment:
   the parsers too



##########
sql/core/src/test/resources/test-data/xml-resources/ages-with-spaces.xml:
##########
@@ -0,0 +1,20 @@
+<people>
+  <person>
+    <age born=" 1990-02-24 "> 25 </age>

Review Comment:
   I was so young :-)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/util/XmlFile.scala:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml.util
+
+import java.io.CharArrayWriter
+import java.nio.charset.Charset
+import javax.xml.stream.XMLOutputFactory
+
+import scala.collection.Map
+
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.io.compress.CompressionCodec
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.xml.{XmlInputFormat, XmlOptions}
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlGenerator
+
+private[xml] object XmlFile {

Review Comment:
   we won't likely need this file too



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlDataToCatalyst.scala:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlDataToCatalyst(

Review Comment:
   Would need to register this to SQL expression. Can be done in a followup.



##########
sql/core/src/test/java/test/org/apache/spark/sql/execution/datasources/xml/JavaXmlSuite.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package test.org.apache.spark.sql.execution.datasources.xml;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.xml.XmlOptions;
+import org.apache.spark.sql.execution.datasources.xml.XmlReader;
+
+public final class JavaXmlSuite {
+
+    private static final int numBooks = 12;
+    private static final String booksFile = "src/test/resources/test-data/xml-resources/books.xml";

Review Comment:
   I wonder if we should just name it `resources` instead of `xml-resources`. Or create a sub directory?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/package.scala:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.io.compress.CompressionCodec
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Dataset}
+import org.apache.spark.sql.{Encoders, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.execution.datasources.xml.util.{InferSchema, XmlFile}
+import org.apache.spark.sql.types.{ArrayType, StructType}
+
+package object xml {

Review Comment:
   Would have to remove this file too, and move those utils such as `schema_of_xml` to somewhere else.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/util/PartialResultException.scala:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml.util

Review Comment:
   We actually have it in the codebase already :-).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlReader.scala:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.util.FailFastMode
+import org.apache.spark.sql.execution.datasources.xml.util.XmlFile
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A collection of static functions for working with XML files in Spark SQL
+ */
+class XmlReader(private var schema: StructType,

Review Comment:
   Should probably add a couple of methods in DataFrameReader/DataStreamReader



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlRelation.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.xml
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.execution.datasources.xml.util.{InferSchema, XmlFile}
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, PrunedScan}
+import org.apache.spark.sql.types._
+
+case class XmlRelation protected[spark] (
+    baseRDD: () => RDD[String],
+    location: Option[String],
+    parameters: Map[String, String],
+    userSchema: StructType = null)(@transient val sqlContext: SQLContext)
+  extends BaseRelation
+  with InsertableRelation
+  with PrunedScan {
+
+  // Hacky: ensure RDD's underlying data actually already exists early on
+  baseRDD().partitions
+
+  private val options = XmlOptions(parameters)
+
+  override val schema: StructType = {
+    Option(userSchema).getOrElse {
+      InferSchema.infer(
+        baseRDD(),
+        options)
+    }
+  }
+
+  override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
+    val requiredFields = requiredColumns.map(schema(_))
+    val requestedSchema = StructType(requiredFields)
+    StaxXmlParser.parse(
+      baseRDD(),
+      requestedSchema,
+      options)
+  }
+
+  // The function below was borrowed from JSONRelation
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    val filesystemPath = location match {
+      case Some(p) => new Path(p)
+      case None =>
+        throw new IOException(s"Cannot INSERT into table with no path defined")
+    }
+
+    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+    if (overwrite) {
+      try {
+        fs.delete(filesystemPath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(

Review Comment:
   As a followup, we should leverage error framework we added (see `QueryExecutionErrors`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org