You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lminer <lm...@hotmail.com> on 2016/11/04 17:53:39 UTC

SAXParseException while writing to parquet on s3

I'm trying to read in some json, infer a schema, and write it out again as
parquet to s3 (s3a). For some reason, about a third of the way through the
writing portion of the run, spark always errors out with the error included
below. I can't find any obvious reasons for the issue: it isn't out of
memory; there are no long GC pauses. There don't seem to be any additional
error messages in the logs of the individual executors.

The script runs fine on another set of data that I have, which is of a very
similar structure, but several orders of magnitude smaller.

I am running spark 2.0.1-hadoop-2.7 and am using the FileOutputCommitter.
The algorithm version doesn't seem to matter.

This does not appear to be a problem in badly formed json or corrupted
files. I have unzipped and read in each file individually with no error.

Here's a simplified version of the script:

    object Foo {

      def parseJson(json: String): Option[Map[String, Any]] = {
        if (json == null)
          Some(Map())
        else
          parseOpt(json).map((j: JValue) =>
j.values.asInstanceOf[Map[String, Any]])
          }
      }
    }

    // read in as text and parse json using json4s
    val jsonRDD: RDD[String] = sc.textFile(inputPath)
        .map(row -> Foo.parseJson(row))

    // infer a schema that will encapsulate the most rows in a sample of
size sampleRowNum
    val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD,
sampleRowNum)

    // get documents compatibility with schema
    val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
      .map(js => (js, Infer.getSchemaCompatibility(schema,
Infer.inferSchema(js)).toBoolean))
      .repartition(partitions)

    val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
      .filter { case (js: String, compatible: Boolean) => compatible }
      .map { case (js: String, _: Boolean) => js }

    // create a dataframe from documents with compatible schema
    val dataFrame: DataFrame =
spark.read.schema(schema).json(jsonCompatibleRDD)

It completes the earlier schema inferring steps successfully. The error
itself occurs on the last line, but I suppose that could encompass at least
the immediately preceding statemnt, if not earlier:

    org.apache.spark.SparkException: Task failed while writing rows
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
        at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Failed to commit task
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
        at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
        ... 8 more
        Suppressed: java.lang.NullPointerException
            at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
            at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
            at
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
            at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
            at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282)
            at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258)
            at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1354)
            ... 9 more
    Caused by: com.amazonaws.AmazonClientException: Unable to unmarshall
response (Failed to parse XML document with handler class
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler).
Response Code: 200, Response Text: OK
        at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:738)
        at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:399)
        at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
        at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3480)
        at
com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:604)
        at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:962)
        at
org.apache.hadoop.fs.s3a.S3AFileSystem.deleteUnnecessaryFakeDirectories(S3AFileSystem.java:1147)
        at
org.apache.hadoop.fs.s3a.S3AFileSystem.finishedWrite(S3AFileSystem.java:1136)
        at
org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:142)
        at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at
org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:400)
        at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
        at
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
        at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
        at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:267)
        ... 13 more
    Caused by: com.amazonaws.AmazonClientException: Failed to parse XML
document with handler class
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListBucketHandler
        at
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:150)
        at
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:279)
        at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:75)
        at
com.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsUnmarshaller.unmarshall(Unmarshallers.java:72)
        at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62)
        at
com.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31)
        at
com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:712)
        ... 29 more
    Caused by: org.xml.sax.SAXParseException; lineNumber: 1; columnNumber:
2; XML document structures must start and end within the same entity.
        at
org.apache.xerces.util.ErrorHandlerWrapper.createSAXParseException(Unknown
Source)
        at org.apache.xerces.util.ErrorHandlerWrapper.fatalError(Unknown
Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown
Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown
Source)
        at org.apache.xerces.impl.XMLErrorReporter.reportError(Unknown
Source)
        at org.apache.xerces.impl.XMLScanner.reportFatalError(Unknown
Source)
        at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.endEntity(Unknown
Source)
        at org.apache.xerces.impl.XMLDocumentScannerImpl.endEntity(Unknown
Source)
        at org.apache.xerces.impl.XMLEntityManager.endEntity(Unknown Source)
        at org.apache.xerces.impl.XMLEntityScanner.load(Unknown Source)
        at org.apache.xerces.impl.XMLEntityScanner.skipChar(Unknown Source)
        at
org.apache.xerces.impl.XMLDocumentScannerImpl$PrologDispatcher.dispatch(Unknown
Source)
        at
org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown
Source)
        at org.apache.xerces.parsers.XML11Configuration.parse(Unknown
Source)
        at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
        at org.apache.xerces.parsers.AbstractSAXParser.parse(Unknown Source)
        at
com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseXmlInputStream(XmlResponsesSaxParser.java:141)
        ... 35 more

Here's my conf:

    spark.executor.extraJavaOptions -XX:+UseG1GC -XX:MaxPermSize=1G
-XX:+HeapDumpOnOutOfMemoryError
    spark.executor.memory   16G
    spark.executor.uri 
https://s3.amazonaws.com/foo/spark-2.0.1-bin-hadoop2.7.tgz
    spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hadoop.fs.s3a.buffer.dir  /raid0/spark
    spark.hadoop.fs.s3n.buffer.dir  /raid0/spark
    spark.hadoop.fs.s3a.connection.timeout 500000
    spark.hadoop.fs.s3n.multipart.uploads.enabled   true
    spark.hadoop.parquet.block.size 2147483648
    spark.hadoop.parquet.enable.summary-metadata    false
    spark.jars.packages com.databricks:spark-avro_2.11:3.0.1
    spark.local.dir /raid0/spark
    spark.mesos.coarse  false
    spark.mesos.constraints  priority:1
    spark.network.timeout   600
    spark.rpc.message.maxSize    500
    spark.speculation   false
    spark.sql.parquet.mergeSchema   false
    spark.sql.planner.externalSort  true
    spark.submit.deployMode client
    spark.task.cpus 1



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SAXParseException-while-writing-to-parquet-on-s3-tp28019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org