You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ljwagerfield <gi...@git.apache.org> on 2016/05/19 17:30:21 UTC

[GitHub] spark pull request: [SPARK-10548] [SPARK-10563] [SQL] Fix concurre...

Github user ljwagerfield commented on the pull request:

    https://github.com/apache/spark/pull/8710#issuecomment-220395710
  
    We're seeing this exception too. We're also running our operations in serial (at least on the surface it seems as if we are). If we execute a `df.save` operation in a `Future` and wait for that `Future` to complete, then all `df.save` operations we perform within subsequent `Future`s will fail.
    
    This specifically happens when we load Avro files from S3 and save them as Parquet back to S3. The loading works fine but the saving fails on 2nd attempt. Furthermore, if we simply generate a `DataFrame` from an in-memory list (so we're not loading from S3 - only saving to S3) then the error goes away... I'm not sure how helpful this is.
    
    We're using Java 1.8, Scala 2.10.5, with our Spark codebase at commit https://github.com/apache/spark/commit/15de51c238a7340fa81cb0b80d029a05d97bfc5c.
    
    Our exact reproduction steps are:
    
    **1. Run a Spark Shell with appropriate dependencies**
    ```
    ./spark-shell --packages com.amazonaws:aws-java-sdk:1.10.75,org.apache.hadoop:hadoop-aws:2.7.2,com.databricks:spark-avro_2.10:2.0.1
    ```
    
    **2. Run the following setup code within the shell**
    ```
    import scala.concurrent.{ExecutionContext, Future}
    import scala.concurrent.ExecutionContext.Implicits.global
    import sqlContext.implicits._
    import org.apache.spark.sql._
    implicit val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    val hadoopConf = sc.hadoopConfiguration;
    hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    hadoopConf.set("fs.s3.awsAccessKeyId", "...")
    hadoopConf.set("fs.s3.awsSecretAccessKey", "...")
    
    val df = sqlContext.read.format("com.databricks.spark.avro").load("s3://bucket/input.avro")
    
    def doWrite() {
        df.write.format("org.apache.spark.sql.parquet").mode(SaveMode.Overwrite).save("s3://bucket/output")
    }
    ```
    
    **3. Run this _twice_ - but leaving time for the first execution to finish (so the operations are serialised)**
    ```
    Future { doWrite(); println("SUCCEEDED") }.recover { case e: Throwable => println("FAILED: " + e.getMessage()); e.printStackTrace() }
    ```
    
    **Result:**
    ```
    spark.sql.execution.id is already set
    java.lang.IllegalArgumentException: spark.sql.execution.id is already set
    	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
    	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    	at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    	at $line38.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.doWrite(<console>:41)
    	at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply$mcV$sp(<console>:43)
    	at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43)
    	at $line40.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:43)
    	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    	at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
    	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org