You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Long, Andrew" <lo...@amazon.com.INVALID> on 2019/04/11 20:00:19 UTC

Which parts of a parquet read happen on the driver vs the executor?

Hey Friends,

I’m working on a POC that involves reading and writing parquet files mid dag.  Writes are working but I’m struggling with getting reads working due to serialization issues. I’ve got code that works in master=local but not in yarn.  So here are my questions.


  1.  Is there an easy way to tell if a particular function in spark will be run on the driver or the executor?  My current system is that if the function uses the spark session it runs on the driver but….
  2.  Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver or the executor?  Dyue to the spark session I was suspecting that it was run on the driver and then the resulting iterator was sent to the executor to run the read but I’ve been running into serialization issues.

19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: Failed to serialize task 26, not attempting to retry it.
java.io.NotSerializableException: scala.collection.Iterator$$anon$12
Serialization stack:
                - object not serializable (class: scala.collection.Iterator$$anon$12, value: non-empty iterator)
                - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6993864a)
                - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.$colon$colon, List(non-empty iterator))
                - field (class: com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: class scala.collection.immutable.List)

Is there something I’m missing here?

Here’s the code I’m using to read records.

def read(path: String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow] = {
  val partitionSchema = StructType(Seq.empty)
  val status = spark.fs.getFileStatus(path)

  val pFile = new PartitionedFile(
    partitionValues = InternalRow.empty,//This should be empty for non partitioned values
    filePath = path.toString,
    start = 0,
    length = status.getLen
  )

  val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
    new ParquetFileFormat().buildReaderWithPartitionValues(
      sparkSession = spark,
      dataSchema = fileSchema,
      partitionSchema = partitionSchema,//this should be empty for non partitioned fields
      requiredSchema = requiredSchema,
      filters = Seq.empty,
      options = Map.empty,
      hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    )

  import scala.collection.JavaConverters._

  val i: Iterator[Any] = readFile(pFile)
  val rows = i.flatMap(_ match {
    case r: InternalRow => Seq(r)
    case b: ColumnarBatch => b.rowIterator().asScala
  })

  rows
}


Cheers Andrew

Re: Which parts of a parquet read happen on the driver vs the executor?

Posted by Sean Owen <sr...@gmail.com>.
Spark is a distributed compute framework of course, so things you do
with Spark operations like map, filter, groupBy, etc do not happen on
the driver. The function is serialized to the executors. The error
here just indicates you are making some function that references
things that can't be serialized.

I'm not quite clear from your code what you're doing here, but it's
not using Spark operations to read Parquet. I actually don't see it
invoking Spark end-user APIs at all? You're using some Spark internals
directly and so that's kind of executing driver-side, but this isn't
how you'd use Spark to read Parquet. Whatever this is, you could only
execute it driver-side if it's using the SparkSession.

On Thu, Apr 11, 2019 at 3:01 PM Long, Andrew
<lo...@amazon.com.invalid> wrote:
>
> Hey Friends,
>
>
>
> I’m working on a POC that involves reading and writing parquet files mid dag.  Writes are working but I’m struggling with getting reads working due to serialization issues. I’ve got code that works in master=local but not in yarn.  So here are my questions.
>
>
>
> Is there an easy way to tell if a particular function in spark will be run on the driver or the executor?  My current system is that if the function uses the spark session it runs on the driver but….
> Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver or the executor?  Dyue to the spark session I was suspecting that it was run on the driver and then the resulting iterator was sent to the executor to run the read but I’ve been running into serialization issues.
>
>
>
> 19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: Failed to serialize task 26, not attempting to retry it.
>
> java.io.NotSerializableException: scala.collection.Iterator$$anon$12
>
> Serialization stack:
>
>                 - object not serializable (class: scala.collection.Iterator$$anon$12, value: non-empty iterator)
>
>                 - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
>
>                 - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@6993864a)
>
>                 - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
>
>                 - object (class scala.collection.immutable.$colon$colon, List(non-empty iterator))
>
>                 - field (class: com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: class scala.collection.immutable.List)
>
>
>
> Is there something I’m missing here?
>
>
>
> Here’s the code I’m using to read records.
>
>
>
> def read(path: String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow] = {
>   val partitionSchema = StructType(Seq.empty)
>   val status = spark.fs.getFileStatus(path)
>
>   val pFile = new PartitionedFile(
>     partitionValues = InternalRow.empty,//This should be empty for non partitioned values
>     filePath = path.toString,
>     start = 0,
>     length = status.getLen
>   )
>
>   val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
>     new ParquetFileFormat().buildReaderWithPartitionValues(
>       sparkSession = spark,
>       dataSchema = fileSchema,
>       partitionSchema = partitionSchema,//this should be empty for non partitioned fields
>       requiredSchema = requiredSchema,
>       filters = Seq.empty,
>       options = Map.empty,
>       hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>     )
>
>   import scala.collection.JavaConverters._
>
>   val i: Iterator[Any] = readFile(pFile)
>   val rows = i.flatMap(_ match {
>     case r: InternalRow => Seq(r)
>     case b: ColumnarBatch => b.rowIterator().asScala
>   })
>
>   rows
> }
>
>
>
>
>
> Cheers Andrew

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org