You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sandip-db (via GitHub)" <gi...@apache.org> on 2023/08/11 21:51:20 UTC

[GitHub] [spark] sandip-db opened a new pull request, #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

sandip-db opened a new pull request, #42462:
URL: https://github.com/apache/spark/pull/42462

   ### What changes were proposed in this pull request?
   This is the second PR related to the built-in XML data source implementation ([jira](https://issues.apache.org/jira/browse/SPARK-44751)).
   The previous [PR](https://github.com/apache/spark/pull/41832) ported the spark-xml package.
   This PR addresses the following:
   - Implement FileFormat interface
   - Address the review comments in the previous [XML PR](https://github.com/apache/spark/pull/41832)
   - Moved from_xml and schema_of_xml to sql/functions
   - Moved ".xml" to DataFrameReader/DataFrameWriter
   - Removed old APIs like XmlRelation, XmlReader, etc.
   - StaxXmlParser changes:
      - Use FailureSafeParser
      - Convert 'Row' usage to 'InternalRow'
      - Convert String to UTF8String
      - Handle MapData and ArrayData for MapType and ArrayType respectively
      - Use TimestampFormatter to parse timestamp
      - Use DateFormatter to parse date
   - StaxXmlGenerator changes:
      - Convert 'Row' usage to 'InternalRow'
      - Handle UTF8String for StringType
      - Handle MapData and ArrayData for MapType and ArrayType respectively
      - Use TimestampFormatter to format timestamp
      - Use DateFormatter to format date
   - Update XML tests accordingly because of the above changes
   
   
   ### Why are the changes needed?
   These changes are required to bring XML data source capability at par with CSV and JSON and supports features like streaming, which requires FileFormat interface to be implemented.
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, it adds support for XML data source.
   
   
   ### How was this patch tested?
   - Ran all the XML unit tests. 
   - Github Action
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300728773


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1, data2]]}

Review Comment:
   Done.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300847730


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.xml
+
+import java.nio.charset.StandardCharsets
+import java.time.ZoneId
+import java.util.Locale
+import javax.xml.stream.XMLInputFactory
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, DateFormatter, DateTimeUtils, ParseMode, PermissiveMode, TimestampFormatter}
+import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+
+/**
+ * Options for the XML data source.
+ */
+private[sql] class XmlOptions(
+    @transient val parameters: CaseInsensitiveMap[String],
+    defaultTimeZoneId: String,
+    defaultColumnNameOfCorruptRecord: String)
+  extends FileSourceOptions(parameters) with Logging {
+
+  import XmlOptions._
+
+  def this(
+      parameters: Map[String, String] = Map.empty,
+      defaultTimeZoneId: String = SQLConf.get.sessionLocalTimeZone,
+      defaultColumnNameOfCorruptRecord: String = "") = {
+    this(
+      CaseInsensitiveMap(parameters),
+      defaultTimeZoneId,
+      defaultColumnNameOfCorruptRecord)
+  }
+
+  private def getBool(paramName: String, default: Boolean = false): Boolean = {
+    val param = parameters.getOrElse(paramName, default.toString)
+    if (param == null) {
+      default
+    } else if (param.toLowerCase(Locale.ROOT) == "true") {
+      true
+    } else if (param.toLowerCase(Locale.ROOT) == "false") {
+      false
+    } else {
+      throw QueryExecutionErrors.paramIsNotBooleanValueError(paramName)
+    }
+  }
+
+  val compressionCodec = parameters.get("compression").orElse(parameters.get("codec"))
+    .map(CompressionCodecs.getCodecClassName)
+  val rowTag = parameters.getOrElse("rowTag", XmlOptions.DEFAULT_ROW_TAG)
+  require(rowTag.nonEmpty, "'rowTag' option should not be empty string.")
+  require(!rowTag.startsWith("<") && !rowTag.endsWith(">"),
+          "'rowTag' should not include angle brackets")
+  val rootTag = parameters.getOrElse("rootTag", XmlOptions.DEFAULT_ROOT_TAG)
+  require(!rootTag.startsWith("<") && !rootTag.endsWith(">"),
+          "'rootTag' should not include angle brackets")
+  val declaration = parameters.getOrElse("declaration", XmlOptions.DEFAULT_DECLARATION)
+  require(!declaration.startsWith("<") && !declaration.endsWith(">"),
+          "'declaration' should not include angle brackets")
+  val arrayElementName = parameters.getOrElse("arrayElementName",
+    XmlOptions.DEFAULT_ARRAY_ELEMENT_NAME)
+  val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
+  require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
+  val excludeAttributeFlag = parameters.get("excludeAttribute").map(_.toBoolean).getOrElse(false)
+  val treatEmptyValuesAsNulls =
+    parameters.get("treatEmptyValuesAsNulls").map(_.toBoolean).getOrElse(false)
+  val attributePrefix =
+    parameters.getOrElse("attributePrefix", XmlOptions.DEFAULT_ATTRIBUTE_PREFIX)
+  val valueTag = parameters.getOrElse("valueTag", XmlOptions.DEFAULT_VALUE_TAG)
+  require(valueTag.nonEmpty, "'valueTag' option should not be empty string.")
+  require(valueTag != attributePrefix,
+    "'valueTag' and 'attributePrefix' options should not be the same.")
+  val nullValue = parameters.getOrElse("nullValue", XmlOptions.DEFAULT_NULL_VALUE)
+  val columnNameOfCorruptRecord =
+    parameters.getOrElse("columnNameOfCorruptRecord", "_corrupt_record")
+  val ignoreSurroundingSpaces =
+    parameters.get("ignoreSurroundingSpaces").map(_.toBoolean).getOrElse(false)
+  val parseMode = ParseMode.fromString(parameters.getOrElse("mode", PermissiveMode.name))
+  val inferSchema = parameters.get("inferSchema").map(_.toBoolean).getOrElse(true)
+  val rowValidationXSDPath = parameters.get("rowValidationXSDPath").orNull
+  val wildcardColName =
+    parameters.getOrElse("wildcardColName", XmlOptions.DEFAULT_WILDCARD_COL_NAME)
+  val ignoreNamespace = parameters.get("ignoreNamespace").map(_.toBoolean).getOrElse(false)
+
+  /**
+   * Infer columns with all valid date entries as date type (otherwise inferred as string or
+   * timestamp type) if schema inference is enabled.
+   *
+   * Enabled by default.
+   *
+   * Not compatible with legacyTimeParserPolicy == LEGACY since legacy date parser will accept
+   * extra trailing characters. Thus, disabled when legacyTimeParserPolicy == LEGACY
+   */
+  val preferDate = {
+    if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+      false
+    } else {
+      getBool(PREFER_DATE, true)
+    }
+  }
+
+  val dateFormatOption: Option[String] = parameters.get(DATE_FORMAT)
+  // Provide a default value for dateFormatInRead when preferDate. This ensures that the
+  // Iso8601DateFormatter (with strict date parsing) is used for date inference
+  val dateFormatInRead: Option[String] =
+  if (preferDate) {
+    Option(dateFormatOption.getOrElse(DateFormatter.defaultPattern))
+  } else {
+    dateFormatOption
+  }
+  val dateFormatInWrite: String = parameters.getOrElse(DATE_FORMAT, DateFormatter.defaultPattern)
+
+
+  val timestampFormatInRead: Option[String] =
+    if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+      Some(parameters.getOrElse(TIMESTAMP_FORMAT,
+        s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"))
+    } else {
+      parameters.get(TIMESTAMP_FORMAT)
+    }
+  val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
+    if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
+      s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
+    } else {
+      s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
+    })
+
+  // SPARK-39731: Enables the backward compatible parsing behavior.

Review Comment:
   Here too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296727683


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the xml string
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.json), options.iterator)

Review Comment:
   for `StructType`, we call `lit(schema.toDDL)`, why we use json string here for `DataType`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300465613


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##########
@@ -83,21 +86,21 @@ private[xml] object StaxXmlGenerator {
 
     def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match {
       case (_, null) | (NullType, _) => writer.writeCharacters(options.nullValue)
+      case (StringType, v: UTF8String) => writer.writeCharacters(v.toString)
       case (StringType, v: String) => writer.writeCharacters(v)
       case (TimestampType, v: Timestamp) =>

Review Comment:
   Most StringType arrive here with value of type UTF8String.
   
   Also, JacksonGenerator supports DecimalType and I was planning to add support for DecimalType in a followup. Is that not required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42462:
URL: https://github.com/apache/spark/pull/42462#issuecomment-1675685658

   License-wise there is no problem because I wrote them. I filed CCLA/ICLA when I became a committer.
   
   Through SPIP, we have reached a lazy consensus including PMC votes.
   
   IP clearance is for an external project but this is really a plugin that has very small codebase. We haven't done that for Avro, CSV, cloudpickle for example in which the codebase is really small, and we reviewed them line by line.
   
   In addition, we're NOT just porting it as is but release a modified version per Sparks internal interface that will allows a lot of features such as parttioned table. TBH it's more work and code to modify them than just portint it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294190523


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -589,6 +589,11 @@
           "<errors>"

Review Comment:
     Thanks for reviewing this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294190669


##########
sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
##########
@@ -22,7 +22,7 @@ org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
 org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
-org.apache.spark.sql.execution.datasources.xml.DefaultSource
+org.apache.spark.sql.execution.datasources.xml.XMLFileFormat

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294137197


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/parsers/StaxXmlGenerator.scala:
##########
@@ -106,9 +109,11 @@ private[xml] object StaxXmlGenerator {
       // this case only can happen when we convert a normal [[DataFrame]] to XML file.
       // When [[ArrayType]] has [[ArrayType]] as elements, it is confusing what is element name
       // for XML file.
-      case (ArrayType(ty, _), v: scala.collection.Seq[_]) =>
-        v.foreach { e =>
-          writeChild(options.arrayElementName, ty, e)
+      case (ArrayType(ty, _), v: ArrayData) =>
+        var i = 0;

Review Comment:
   Same pattern above, personally prefer not using `var`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296713510


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala:
##########
@@ -117,4 +117,26 @@ object ExprUtils extends QueryErrorsBase {
       TypeCheckSuccess
     }
   }
+
+  /**
+   * Check if the schema is valid for XML
+   *
+   * @param schema The schema to check.
+   * @return
+   * `TypeCheckSuccess` if the schema is valid
+   * `DataTypeMismatch` with an error error if the schema is not valid
+   */
+  def checkXmlSchema(schema: DataType): TypeCheckResult = {
+    val isInvalid = schema.existsRecursively {
+      case MapType(keyType, _, _) if keyType != StringType => true

Review Comment:
   can we add some comments to explain the rationale of only allowing string type map key?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300462435


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),

Review Comment:
   The above `from_xml` function call in the following case class with schema as DataType:
   ```
   case class XmlToStructs(
       schema: DataType,
       options: Map[String, String],
       child: Expression,
       timeZoneId: Option[String] = None)
   ```



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##########
@@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
   def csv(csvDataset: Dataset[String]): DataFrame =
     parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
 
+  /**
+   * Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
+   * overloaded `xml()` method for more details.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): DataFrame = {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation
URL: https://github.com/apache/spark/pull/42462


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300852384


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7314,6 +7314,103 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), options, e.expr)
+  }
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a `StructType`
+   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column = {
+    withExpr(new XmlToStructs(e.expr, schema.expr, options.asScala.toMap))
+  }
+
+  /**
+   * Parses a column containing a XML string into the data type
+   * corresponding to the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+
+   * @group collection_funcs
+   * @since
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml a foldable string column containing a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: Column): Column = withExpr(new SchemaOfXml(xml.expr))
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a XML string and infers its schema in DDL format using options.
+   *
+   * @param xml    a foldable string column containing XML data.
+   * @param options options to control how the xml is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @return a column with string literal containing schema in DDL format.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def schema_of_xml(xml: Column, options: java.util.Map[String, String]): Column = {

Review Comment:
   shall we at least have an overload with scala options?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294192878


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XMLFileFormat.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class XMLFileFormat extends TextBasedFileFormat with DataSourceRegister {
+
+  override def shortName(): String = "xml"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    val parsedOptions = new XmlOptions(

Review Comment:
   XmlOptions need to be initialized before calling into XmlDataSource.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300835598


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =

Review Comment:
   from_csv has just two. I can trim from_xml overloads too. Let me know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300853023


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7314,6 +7314,103 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), options, e.expr)
+  }
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a `StructType`
+   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column = {
+    withExpr(new XmlToStructs(e.expr, schema.expr, options.asScala.toMap))
+  }
+
+  /**
+   * Parses a column containing a XML string into the data type
+   * corresponding to the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+
+   * @group collection_funcs
+   * @since
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml a foldable string column containing a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: Column): Column = withExpr(new SchemaOfXml(xml.expr))
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a XML string and infers its schema in DDL format using options.
+   *
+   * @param xml    a foldable string column containing XML data.
+   * @param options options to control how the xml is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @return a column with string literal containing schema in DDL format.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def schema_of_xml(xml: Column, options: java.util.Map[String, String]): Column = {

Review Comment:
   Actually this is same as `schema_of_json`. I suggested to only have Java map one only for now .. to avoid having too many overloaded versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296694795


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the xml string
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.json), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: DataType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a foldable string column containing a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a XML string and infers its schema in DDL format using options.
+   *
+   * @param xml
+   *   a foldable string column containing XML data.
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @return
+   *   a column with string literal containing schema in DDL format.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def schema_of_xml(xml: Column, options: java.util.Map[String, String]): Column = {
+    fnWithOptions("schema_of_xml", options.asScala.iterator, xml)
+  }
+

Review Comment:
   there is no `to_xml`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299885611


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1, data2]]}

Review Comment:
   Yeah, let's fix them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299680974


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1, data2]]}

Review Comment:
   Let's fix it here then. Also cc @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300881436


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7314,6 +7314,103 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {

Review Comment:
   https://issues.apache.org/jira/browse/SPARK-44753



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300838579


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),
+      options = options,
+      child = child,
+      timeZoneId = None)
+
+  override def nullable: Boolean = true
+
+  // The XML input data might be missing certain fields. We force the nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),
+      options = ExprUtils.convertToMapData(options),
+      child = child,
+      timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = nullableSchema match {
+    case _: StructType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
+    case _: ArrayType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
+    case _: MapType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
+  }
+
+  val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+
+  @transient lazy val parser = {
+    val parsedOptions = new XmlOptions(options, timeZoneId.get, nameOfCorruptRecord)
+    val mode = parsedOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw QueryCompilationErrors.parseModeUnsupportedError("from_xml", mode)
+    }
+    val (parserSchema, actualSchema) = nullableSchema match {
+      case s: StructType =>
+        ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
+        (s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))
+      case other =>
+        (StructType(Array(StructField("value", other))), other)
+    }
+
+    val rowSchema: StructType = schema match {
+      case st: StructType => st
+      case ArrayType(st: StructType, _) => st
+    }
+    val rawParser = new StaxXmlParser(rowSchema, parsedOptions)
+    val xsdSchema = Option(parsedOptions.rowValidationXSDPath).map(ValidatorUtil.getSchema)
+
+    new FailureSafeParser[String](
+      input => rawParser.doParseColumn(input, mode, xsdSchema),
+      mode,
+      parserSchema,
+      parsedOptions.columnNameOfCorruptRecord)
+  }
+
+  override def dataType: DataType = nullableSchema
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
+    copy(timeZoneId = Option(timeZoneId))
+  }
+  override def nullSafeEval(xml: Any): Any = xml match {
+    case arr: GenericArrayData =>

Review Comment:
   You are right. It shouldn't be there. Can I address this in a follow-up?
   https://issues.apache.org/jira/browse/SPARK-44810



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300802715


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1], [data2]]}
+   *
+   * would produce a XML file below. { @code <fieldA> <item>data1</item> </fieldA> <fieldA>
+   * <item>data2</item> </fieldA>}

Review Comment:
   is this the fixed version?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294136842


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/parsers/StaxXmlGenerator.scala:
##########
@@ -71,9 +72,11 @@ private[xml] object StaxXmlGenerator {
           writer.writeAttribute(name.substring(options.attributePrefix.length), v.toString)
 
         // For ArrayType, we just need to write each as XML element.
-        case (ArrayType(ty, _), v: scala.collection.Seq[_]) =>
-          v.foreach { e =>
-            writeChildElement(name, ty, e)
+        case (ArrayType(ty, _), v: ArrayData) =>
+          var i = 0;
+          while (i < v.numElements()) {

Review Comment:
   Nit, `(0 until v.numElements()).foreach { i =>
       writeChildElement(name, ty, v.get(i, ty))
   }` to avoid `var`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294138473


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/util/XmlFile.scala:
##########
@@ -16,23 +16,14 @@
  */
 package org.apache.spark.sql.execution.datasources.xml.util
 
-import java.io.CharArrayWriter
 import java.nio.charset.Charset
-import javax.xml.stream.XMLOutputFactory
 
-import scala.collection.Map
-
-import com.sun.xml.txw2.output.IndentingXMLStreamWriter
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.io.compress.CompressionCodec
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.catalyst.util.CompressionCodecs
-import org.apache.spark.sql.execution.datasources.xml.{XmlInputFormat, XmlOptions}
-import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlGenerator
+import org.apache.spark.sql.execution.datasources.xml.XmlInputFormat
 
 private[xml] object XmlFile {
   val DEFAULT_INDENT = "    "

Review Comment:
   Duplicated constants?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299580676


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##########
@@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
   def csv(csvDataset: Dataset[String]): DataFrame =
     parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
 
+  /**
+   * Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
+   * overloaded `xml()` method for more details.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): DataFrame = {

Review Comment:
   FYI, should add it to `DataStreamReader` / `DataStreamWriter`. Same for Python at `readwriter.py`. Can be done as a followup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294599244


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -589,6 +589,11 @@
           "<errors>"

Review Comment:
   You're welcome! This is great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299583398


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##########
@@ -83,21 +86,21 @@ private[xml] object StaxXmlGenerator {
 
     def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match {
       case (_, null) | (NullType, _) => writer.writeCharacters(options.nullValue)
+      case (StringType, v: UTF8String) => writer.writeCharacters(v.toString)
       case (StringType, v: String) => writer.writeCharacters(v)
       case (TimestampType, v: Timestamp) =>

Review Comment:
   I think you can remove this, and `case (DecimalType(), v: java.math.BigDecimal) => writer.writeCharacters(v.toString)` (See also `JacksonGenerator` and which types are being handled).
   
   BTW, we would also add the type supports for `TimestampNTZType`, `YearMonthIntervalType` and `DayTimeIntervalType`. But they are orthogonal and can be done separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on PR #42462:
URL: https://github.com/apache/spark/pull/42462#issuecomment-1686820801

   > Thanks for the explanation @HyukjinKwon. I'm OK with it if we already have precedents like arvo and csv
   
   @yaooqinn @HyukjinKwon has addressed your concern. Can you please approve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300806528


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =

Review Comment:
   the schema parameter can be `StructType` or `Column`, the options parameter can be scala or java map, or omitted. This means we need 6 overloads of `from_xml`.
   
   Does it really worth it? I know we did the same thing for `from_json`, but this is really convoluted.
   
   How about something like
   ```
   TextParsingFunction.newBuilder()
     .withSchema(...) // It has multiple overloads
     .withOptions(...) // It has multiple overloads
     .xml() // returns a Column
   ```
   
   Anyway, it's unrelated to this PR. We can do it later. cc @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300816102


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into the data type corresponding to
+   * the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))

Review Comment:
   Ideally we should only use `Column` signature for newly added functions (see the comments on the top)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294131118


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XMLFileFormat.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class XMLFileFormat extends TextBasedFileFormat with DataSourceRegister {
+
+  override def shortName(): String = "xml"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    val parsedOptions = new XmlOptions(

Review Comment:
   Nit, seems duplicated codes across these functions for xmlOptions initialization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294137466


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/parsers/StaxXmlGenerator.scala:
##########
@@ -136,7 +143,25 @@ private[xml] object StaxXmlGenerator {
           s"Failed to convert value $v (class of ${v.getClass}) in type $dt to XML.")
     }
 
-    val (attributes, elements) = schema.zip(row.toSeq).partition { case (f, _) =>
+    def writeMapData(mapType: MapType, map: MapData): Unit = {
+      val keyArray = map.keyArray()
+      val valueArray = map.valueArray()
+      // write attributes first
+      Seq (true, false).foreach { writeAttribute =>
+        var i = 0
+        while (i < map.numElements()) {

Review Comment:
   Same above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294128260


##########
sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:
##########
@@ -22,7 +22,7 @@ org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
 org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
-org.apache.spark.sql.execution.datasources.xml.DefaultSource
+org.apache.spark.sql.execution.datasources.xml.XMLFileFormat

Review Comment:
   Nit, but might unify all XML to Xml (Camel) in naming to be consistent with other DataSource.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300807797


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),
+      options = options,
+      child = child,
+      timeZoneId = None)
+
+  override def nullable: Boolean = true
+
+  // The XML input data might be missing certain fields. We force the nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),
+      options = ExprUtils.convertToMapData(options),
+      child = child,
+      timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = nullableSchema match {
+    case _: StructType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
+    case _: ArrayType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
+    case _: MapType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
+  }
+
+  val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+
+  @transient lazy val parser = {
+    val parsedOptions = new XmlOptions(options, timeZoneId.get, nameOfCorruptRecord)
+    val mode = parsedOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw QueryCompilationErrors.parseModeUnsupportedError("from_xml", mode)
+    }
+    val (parserSchema, actualSchema) = nullableSchema match {
+      case s: StructType =>
+        ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
+        (s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))
+      case other =>
+        (StructType(Array(StructField("value", other))), other)
+    }
+
+    val rowSchema: StructType = schema match {
+      case st: StructType => st
+      case ArrayType(st: StructType, _) => st
+    }
+    val rawParser = new StaxXmlParser(rowSchema, parsedOptions)
+    val xsdSchema = Option(parsedOptions.rowValidationXSDPath).map(ValidatorUtil.getSchema)
+
+    new FailureSafeParser[String](
+      input => rawParser.doParseColumn(input, mode, xsdSchema),
+      mode,
+      parserSchema,
+      parsedOptions.columnNameOfCorruptRecord)
+  }
+
+  override def dataType: DataType = nullableSchema
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
+    copy(timeZoneId = Option(timeZoneId))
+  }
+  override def nullSafeEval(xml: Any): Any = xml match {
+    case arr: GenericArrayData =>

Review Comment:
   why do we match this case if the handling is exactly the same with `case arr: ArrayData`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300489578


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7314,6 +7314,136 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), options, e.expr)
+  }
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {

Review Comment:
   DataType is not fully functional. Removed the from_xml with DataType arg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299581325


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the xml string
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.json), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: DataType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a foldable string column containing a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a XML string and infers its schema in DDL format using options.

Review Comment:
   ```suggestion
      * (Java-specific) Parses a XML string and infers its schema in DDL format using options.
   ```



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the xml string
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.json), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: DataType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a foldable string column containing a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a XML string and infers its schema in DDL format using options.

Review Comment:
   ```suggestion
      * (Java-specific) Parses a XML string and infers its schema in DDL format using options.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300842239


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =

Review Comment:
   Let's remove this signature with Scala map for now in a followup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300835963


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1], [data2]]}
+   *
+   * would produce a XML file below. { @code <fieldA> <item>data1</item> </fieldA> <fieldA>
+   * <item>data2</item> </fieldA>}

Review Comment:
   yes. Changed:
   `{@code fieldA [[data1, data2]]}`
   to
   `{@code fieldA [[data1], [data2]]}`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294127930


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -589,6 +589,11 @@
           "<errors>"

Review Comment:
   This is great! I was thinking upgrading the xml reader with data source v2 before but really stopped by the refactoring work involved. Thanks for adding it into the spark mainline to unify the interfaces and catch up with the main changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294129649


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XMLFileFormat.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.

Review Comment:
   Nit, Typo should be XML



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296693769


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1, data2]]}

Review Comment:
   in this example, the outer `fieldA` array only has one element, right? Why we generate two `<fieldA></fieldA>` pairs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296723330


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1, data2]]}

Review Comment:
   You are right. I checked the code and it generates only one <fieldA>...</fieldA>
   Its a typo carried over from [here](https://github.com/databricks/spark-xml/commit/13539b41d6054442824457c67f0150e00ae353ea#diff-6571ce640b4e8e7d4b308679755e5f38503a0e2cec0e0ea4ee7d507afd181176R75)



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the xml string
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.json), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: DataType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a foldable string column containing a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: Column): Column = Column.fn("schema_of_xml", xml)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a XML string and infers its schema in DDL format using options.
+   *
+   * @param xml
+   *   a foldable string column containing XML data.
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @return
+   *   a column with string literal containing schema in DDL format.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def schema_of_xml(xml: Column, options: java.util.Map[String, String]): Column = {
+    fnWithOptions("schema_of_xml", options.asScala.iterator, xml)
+  }
+

Review Comment:
   Not yet. I have filed a sub-task: https://issues.apache.org/jira/browse/SPARK-44790



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300462435


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),

Review Comment:
   The above `from_xml` function call in the following case class with schema as DataType:
   ```
   case class XmlToStructs(
       schema: DataType,
       options: Map[String, String],
       child: Expression,
       timeZoneId: Option[String] = None)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300815546


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =

Review Comment:
   I think we can remove `(Scala-specific)` signature for now. For the same reason, we don't have that scala specific version of `to_csv`, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300847390


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/CreateXmlParser.scala:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.xml
+
+import java.io.{ByteArrayInputStream, InputStream, InputStreamReader, StringReader}
+import java.nio.channels.Channels
+import java.nio.charset.{Charset, StandardCharsets}
+import javax.xml.stream.{EventFilter, XMLEventReader, XMLInputFactory, XMLStreamConstants}
+import javax.xml.stream.events.XMLEvent
+
+import org.apache.hadoop.io.Text
+import sun.nio.cs.StreamDecoder
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] object CreateXmlParser extends Serializable {
+  val filter = new EventFilter {
+    override def accept(event: XMLEvent): Boolean =
+    // Ignore comments and processing instructions
+      event.getEventType match {
+        case XMLStreamConstants.COMMENT | XMLStreamConstants.PROCESSING_INSTRUCTION => false
+        case _ => true
+      }
+  }
+
+  def string(xmlInputFactory: XMLInputFactory, record: String): XMLEventReader = {
+    // It does not have to skip for white space, since `XmlInputFormat`
+    // always finds the root tag without a heading space.
+    val eventReader = xmlInputFactory.createXMLEventReader(new StringReader(record))
+    xmlInputFactory.createFilteredReader(eventReader, filter)
+  }
+
+  def utf8String(xmlInputFactory: XMLInputFactory, record: UTF8String): XMLEventReader = {
+    val bb = record.getByteBuffer
+    assert(bb.hasArray)
+
+    val bain = new ByteArrayInputStream(
+      bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+
+    val eventReader = xmlInputFactory.createXMLEventReader(
+      new InputStreamReader(bain, StandardCharsets.UTF_8))
+    xmlInputFactory.createFilteredReader(eventReader, filter)
+  }
+
+  def text(xmlInputFactory: XMLInputFactory, record: Text): XMLEventReader = {
+    val bs = new ByteArrayInputStream(record.getBytes, 0, record.getLength)
+
+    val eventReader = xmlInputFactory.createXMLEventReader(bs)
+    xmlInputFactory.createFilteredReader(eventReader, filter)
+  }
+
+  // Jackson parsers can be ranked according to their performance:

Review Comment:
   Let's also update the docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299580956


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##########
@@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
   def csv(csvDataset: Dataset[String]): DataFrame =
     parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
 
+  /**
+   * Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
+   * overloaded `xml()` method for more details.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument works, see SPARK-16009
+    xml(Seq(path): _*)
+  }
+
+  /**
+   * Loads XML files and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can find the XML-specific options for reading XML files in <a
+   * href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">

Review Comment:
   the doc is also followup right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294598633


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XMLFileFormat.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class XMLFileFormat extends TextBasedFileFormat with DataSourceRegister {
+
+  override def shortName(): String = "xml"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    val parsedOptions = new XmlOptions(

Review Comment:
   Yeah, I mean something like 
   ```
   val getXmlOptions = (options: Map[String, String], sparkSession: SparkSession) => new XmlOptions(options,
         sparkSession.sessionState.conf.sessionLocalTimeZone,
         sparkSession.sessionState.conf.columnNameOfCorruptRecord)
   ``` 
   and then simplify each line to `val parsedOptions = getXmlOptions(options, sparkSession)`
   
   Or just create an `def apply(options: Map[String, String], sparkSession: SparkSession)` under the `object XmlOptions` so you can do `val parsedOptions = XmlOptions(options, sparkSession)` 
         



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ericsun95 commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "ericsun95 (via GitHub)" <gi...@apache.org>.
ericsun95 commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294131843


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XMLFileFormat.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class XMLFileFormat extends TextBasedFileFormat with DataSourceRegister {
+
+  override def shortName(): String = "xml"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    val parsedOptions = new XmlOptions(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+    val xmlDataSource = XMLDataSource(parsedOptions)
+    xmlDataSource.isSplitable && super.isSplitable(sparkSession, options, path)
+  }
+
+  override def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val parsedOptions = new XmlOptions(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+    XMLDataSource(parsedOptions).inferSchema(
+      sparkSession, files, parsedOptions)
+  }
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val conf = job.getConfiguration
+    val xmlOptions = new XmlOptions(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+    xmlOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new XMLOutputWriter(path, dataSchema, context, xmlOptions)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".xml" + CodecStreams.getCompressionExtension(context)
+      }
+    }
+  }
+
+  override def buildReader(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    val parsedOptions = new XmlOptions(
+      options,
+      sparkSession.sessionState.conf.sessionLocalTimeZone,
+      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+
+    // Check a field requirement for corrupt records here to throw an exception in a driver side
+    ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

Review Comment:
   nit, `val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord` to simplify the code below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300815546


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =

Review Comment:
   I think we can remove `(Scala-specific)` signature with Scala map for now. For the same reason, we don't have that scala specific version of `to_csv`, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42462:
URL: https://github.com/apache/spark/pull/42462#issuecomment-1687328991

   The tests passed.
   
   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299583398


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##########
@@ -83,21 +86,21 @@ private[xml] object StaxXmlGenerator {
 
     def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match {
       case (_, null) | (NullType, _) => writer.writeCharacters(options.nullValue)
+      case (StringType, v: UTF8String) => writer.writeCharacters(v.toString)
       case (StringType, v: String) => writer.writeCharacters(v)
       case (TimestampType, v: Timestamp) =>

Review Comment:
   I think you can remove this, and `case (DecimalType(), v: java.math.BigDecimal) => writer.writeCharacters(v.toString)`.
   
   BTW, we would also add the type supports for `TimestampNTZType`, `YearMonthIntervalType` and `DayTimeIntervalType`. But they are orthogonal and can be done separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1299684790


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7314,6 +7314,136 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+    XmlToStructs(CharVarcharUtils.failIfHasCharVarchar(schema), options, e.expr)
+  }
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {

Review Comment:
   why do we add this API? looking at the code, `XmlToStructs.schema` must be `StructType`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300463633


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##########
@@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
   def csv(csvDataset: Dataset[String]): DataFrame =
     parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
 
+  /**
+   * Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
+   * overloaded `xml()` method for more details.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): DataFrame = {
+    // This method ensures that calls that explicit need single argument works, see SPARK-16009
+    xml(Seq(path): _*)
+  }
+
+  /**
+   * Loads XML files and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can find the XML-specific options for reading XML files in <a
+   * href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">

Review Comment:
   yes. https://issues.apache.org/jira/browse/SPARK-44752



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300728996


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala:
##########
@@ -117,4 +117,26 @@ object ExprUtils extends QueryErrorsBase {
       TypeCheckSuccess
     }
   }
+
+  /**
+   * Check if the schema is valid for XML
+   *
+   * @param schema The schema to check.
+   * @return
+   * `TypeCheckSuccess` if the schema is valid
+   * `DataTypeMismatch` with an error error if the schema is not valid
+   */
+  def checkXmlSchema(schema: DataType): TypeCheckResult = {
+    val isInvalid = schema.existsRecursively {
+      case MapType(keyType, _, _) if keyType != StringType => true

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300816102


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into the data type corresponding to
+   * the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))

Review Comment:
   Ideally we should only use `Column` signature for newly added functions (see the comments on the top)



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into the data type corresponding to
+   * the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))

Review Comment:
   Let's probably remove this signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296729012


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),

Review Comment:
   This requires `StructType`, how does `def from_xml(e: Column, schema: DataType, options: Map[String, String])` work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1294893031


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XMLFileFormat.scala:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.xml
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.ExprUtils
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.xml.parsers.StaxXmlParser
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Provides access to CSV data from pure SQL statements.
+ */
+class XMLFileFormat extends TextBasedFileFormat with DataSourceRegister {
+
+  override def shortName(): String = "xml"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = {
+    val parsedOptions = new XmlOptions(

Review Comment:
   Done. Thanks for the suggestions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300465613


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##########
@@ -83,21 +86,21 @@ private[xml] object StaxXmlGenerator {
 
     def writeElement(dt: DataType, v: Any, options: XmlOptions): Unit = (dt, v) match {
       case (_, null) | (NullType, _) => writer.writeCharacters(options.nullValue)
+      case (StringType, v: UTF8String) => writer.writeCharacters(v.toString)
       case (StringType, v: String) => writer.writeCharacters(v)
       case (TimestampType, v: Timestamp) =>

Review Comment:
   JacksonParser supports DecimalType and I was planning to add support for DecimalType in a followup. Is that not required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandip-db commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "sandip-db (via GitHub)" <gi...@apache.org>.
sandip-db commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300493066


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),

Review Comment:
   DataType is not fully functional. Removed the from_xml with DataType arg.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the xml string
+   * @param options
+   *   options to control how the xml is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: DataType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.json), options.iterator)

Review Comment:
   DataType is not fully functional. Removed the from_xml with DataType arg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300815881


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,112 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * (Java-specific) Parses a column containing a XML string into the data type corresponding to
+   * the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_xml(e, schema, options.asScala.iterator)
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified
+   * schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  def from_xml(e: Column, schema: StructType): Column =
+    from_xml(e, schema, Map.empty[String, String])
+
+  private def from_xml(e: Column, schema: Column, options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_xml", options, e, schema)
+  }
+
+  /**
+   * Parses a XML string and infers its schema in DDL format.
+   *
+   * @param xml
+   *   a XML string.
+   * @group collection_funcs
+   * @since 4.0.0
+   */
+  def schema_of_xml(xml: String): Column = schema_of_xml(lit(xml))

Review Comment:
   Let's probably remove this signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300848391


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala:
##########
@@ -392,6 +392,46 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
   def csv(csvDataset: Dataset[String]): DataFrame =
     parse(csvDataset, ParseFormat.PARSE_FORMAT_CSV)
 
+  /**
+   * Loads a XML file and returns the result as a `DataFrame`. See the documentation on the other
+   * overloaded `xml()` method for more details.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): DataFrame = {

Review Comment:
   ditto for https://github.com/apache/spark/pull/42462/files#r1300848164



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##########
@@ -470,6 +470,34 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) {
     format("csv").save(path)
   }
 
+  /**
+   * Saves the content of the `DataFrame` in XML format at the specified path. This is equivalent
+   * to:
+   * {{{
+   *   format("xml").save(path)
+   * }}}
+   *
+   * Note that writing a XML file from `DataFrame` having a field `ArrayType` with its element as
+   * `ArrayType` would have an additional nested field for the element. For example, the
+   * `DataFrame` having a field below,
+   *
+   * {@code fieldA [[data1], [data2]]}
+   *
+   * would produce a XML file below. { @code <fieldA> <item>data1</item> </fieldA> <fieldA>
+   * <item>data2</item> </fieldA>}
+   *
+   * Namely, roundtrip in writing and reading can end up in different schema structure.
+   *
+   * You can find the XML-specific options for writing XML files in <a
+   * href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): Unit = {

Review Comment:
   ditto for https://github.com/apache/spark/pull/42462/files#r1300848164



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300848314


##########
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala:
##########
@@ -259,6 +259,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    */
   def csv(path: String): DataFrame = format("csv").load(path)
 
+  /**
+   * Loads a XML file stream and returns the result as a `DataFrame`.
+   *
+   * This function will go through the input once to determine the input schema if `inferSchema`
+   * is enabled. To avoid going through the entire data once, disable `inferSchema` option or
+   * specify the schema explicitly using `schema`.
+   *
+   * You can set the following option(s):
+   * <ul>
+   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
+   * considered in every trigger.</li>
+   * </ul>
+   *
+   * You can find the XML-specific options for reading XML file stream in
+   * <a href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   * Data Source Option</a> in the version you use.
+   *
+   * @since 4.0.0
+   */
+  def xml(path: String): DataFrame = format("xml").load(path)

Review Comment:
   ditto for https://github.com/apache/spark/pull/42462/files#r1300848164



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300848164


##########
sql/core/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7314,6 +7314,103 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Map.empty[String, String].asJava)
 
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into the data type corresponding to the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e       a string column containing XML data.
+   * @param schema  the schema to use when parsing the XML string
+   * @param options options to control how the XML is parsed. accepts the same options and the
+   *                XML data source.
+   *                See
+   *                <a href=
+   *                "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
+   *                Data Source Option</a> in the version you use.
+   * @group collection_funcs
+   * @since
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {

Review Comment:
   TODOs:
   - Scala and Python implementation for Spark Connect
   - Python and R implementation
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1300853517


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.xml
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, ExprUtils, NullIntolerant, TimeZoneAwareExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, GenericArrayData, PermissiveMode}
+import org.apache.spark.sql.catalyst.xml.{StaxXmlParser, ValidatorUtil, XmlInferSchema, XmlOptions}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class XmlToStructs(
+    schema: DataType,
+    options: Map[String, String],
+    child: Expression,
+    timeZoneId: Option[String] = None)
+  extends UnaryExpression
+  with TimeZoneAwareExpression
+  with CodegenFallback
+  with ExpectsInputTypes
+  with NullIntolerant
+  with QueryErrorsBase {
+
+  def this(child: Expression, schema: Expression, options: Map[String, String]) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),
+      options = options,
+      child = child,
+      timeZoneId = None)
+
+  override def nullable: Boolean = true
+
+  // The XML input data might be missing certain fields. We force the nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema = schema.asNullable
+
+  def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+    this(
+      schema = ExprUtils.evalSchemaExpr(schema),
+      options = ExprUtils.convertToMapData(options),
+      child = child,
+      timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = nullableSchema match {
+    case _: StructType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
+    case _: ArrayType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
+    case _: MapType =>
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
+  }
+
+  val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+
+  @transient lazy val parser = {
+    val parsedOptions = new XmlOptions(options, timeZoneId.get, nameOfCorruptRecord)
+    val mode = parsedOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw QueryCompilationErrors.parseModeUnsupportedError("from_xml", mode)
+    }
+    val (parserSchema, actualSchema) = nullableSchema match {
+      case s: StructType =>
+        ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)
+        (s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))
+      case other =>
+        (StructType(Array(StructField("value", other))), other)
+    }
+
+    val rowSchema: StructType = schema match {
+      case st: StructType => st
+      case ArrayType(st: StructType, _) => st
+    }
+    val rawParser = new StaxXmlParser(rowSchema, parsedOptions)
+    val xsdSchema = Option(parsedOptions.rowValidationXSDPath).map(ValidatorUtil.getSchema)
+
+    new FailureSafeParser[String](
+      input => rawParser.doParseColumn(input, mode, xsdSchema),
+      mode,
+      parserSchema,
+      parsedOptions.columnNameOfCorruptRecord)
+  }
+
+  override def dataType: DataType = nullableSchema
+
+  override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
+    copy(timeZoneId = Option(timeZoneId))
+  }
+  override def nullSafeEval(xml: Any): Any = xml match {
+    case arr: GenericArrayData =>

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42462:
URL: https://github.com/apache/spark/pull/42462#issuecomment-1687328861

   Let me get this in first because @sandip-db seems like working on another followup. Let's address them in a followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42462:
URL: https://github.com/apache/spark/pull/42462#discussion_r1296694672


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -7227,6 +7227,150 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing XML data.
+   * @param schema
+   *   the schema to use when parsing the XML string
+   * @param options
+   *   options to control how the XML is parsed. accepts the same options and the XML data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @group collection_funcs
+   *
+   * @since 4.0.0
+   */
+  // scalastyle:on line.size.limit
+  def from_xml(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_xml(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+
+  /**
+   * Parses a column containing a XML string into a `StructType` with the specified schema.

Review Comment:
   ```suggestion
      * Parses a column containing a XML string into the given data type with the specified schema.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] yaooqinn commented on pull request #42462: [SPARK-44751][SQL] XML FileFormat Interface implementation

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on PR #42462:
URL: https://github.com/apache/spark/pull/42462#issuecomment-1676567114

   Thanks for the explanation @HyukjinKwon. I'm OK with it if we already have precedents like arvo and csv


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org