You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ryan Blue <rb...@netflix.com.INVALID> on 2019/03/21 22:31:11 UTC

Re: Manually reading parquet files.

You're getting InternalRow instances. They probably have the data you want,
but the toString representation doesn't match the data for InternalRow.

On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid>
wrote:

> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
>     sparkSession = *spark*,
>     dataSchema = dataSchema,
>     partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
>     requiredSchema = dataSchema,
>     filters = *Seq*.empty,
>     options = *Map*.*empty*,
>     hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix

Re: Manually reading parquet files.

Posted by Wenchen Fan <cl...@gmail.com>.
Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?

On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew <lo...@amazon.com.invalid>
wrote:

> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> })
>   .map(enconder.fromRow(_))
>   .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
>                 at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
>                 at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>                 at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"rblue@netflix.com" <rb...@netflix.com>
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" <lo...@amazon.com.invalid>
> *Cc: *"dev@spark.apache.org" <de...@spark.apache.org>, "
> user@spark.apache.org" <us...@spark.apache.org>, "horizon-dev@amazon.com" <
> horizon-dev@amazon.com>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid>
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
>     sparkSession = *spark*,
>     dataSchema = dataSchema,
>     partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
>     requiredSchema = dataSchema,
>     filters = *Seq*.empty,
>     options = *Map*.*empty*,
>     hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>

Re: Manually reading parquet files.

Posted by Wenchen Fan <cl...@gmail.com>.
Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?

On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew <lo...@amazon.com.invalid>
wrote:

> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> })
>   .map(enconder.fromRow(_))
>   .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
>                 at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
>                 at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>                 at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"rblue@netflix.com" <rb...@netflix.com>
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" <lo...@amazon.com.invalid>
> *Cc: *"dev@spark.apache.org" <de...@spark.apache.org>, "
> user@spark.apache.org" <us...@spark.apache.org>, "horizon-dev@amazon.com" <
> horizon-dev@amazon.com>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid>
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
>     sparkSession = *spark*,
>     dataSchema = dataSchema,
>     partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
>     requiredSchema = dataSchema,
>     filters = *Seq*.empty,
>     options = *Map*.*empty*,
>     hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>

Re: Manually reading parquet files.

Posted by "Long, Andrew" <lo...@amazon.com.INVALID>.
Thanks a ton for the help!

Is there a standardized way of converting the internal row to rows?

I’ve tried this but im getting an exception

val enconder = RowEncoder(df.schema)
val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)
  case b: ColumnarBatch => b.rowIterator().asScala
})
  .map(enconder.fromRow(_))
  .toList

java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException: Cannot evaluate expression: getcolumnbyordinal(0, IntegerType)
createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2, StringType).toString, StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))

                at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
                at com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
                at com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)

From: Ryan Blue <rb...@netflix.com.INVALID>
Reply-To: "rblue@netflix.com" <rb...@netflix.com>
Date: Thursday, March 21, 2019 at 3:32 PM
To: "Long, Andrew" <lo...@amazon.com.invalid>
Cc: "dev@spark.apache.org" <de...@spark.apache.org>, "user@spark.apache.org" <us...@spark.apache.org>, "horizon-dev@amazon.com" <ho...@amazon.com>
Subject: Re: Manually reading parquet files.

You're getting InternalRow instances. They probably have the data you want, but the toString representation doesn't match the data for InternalRow.

On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid> wrote:
Hello Friends,

I’m working on a performance improvement that reads additional parquet files in the middle of a lambda and I’m running into some issues.  This is what id like todo


ds.mapPartitions(x=>{
  //read parquet file in and perform an operation with x
})


Here’s my current POC code but I’m getting nonsense back from the row reader.

import com.amazon.horizon.azulene.util.SparkFileUtils._

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

val data = List(
  TestRow(1,1,"asdf"),
  TestRow(2,1,"asdf"),
  TestRow(3,1,"asdf"),
  TestRow(4,1,"asdf")
)

val df = spark.createDataFrame(data)

val folder = Files.createTempDirectory("azulene-test")

val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)

val files = spark.fs.listStatus(folder.toUri)

val file = files(1)//skip _success file

val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()

val path = file.getPath

val status = spark.fs.getFileStatus(path)

val pFile = new PartitionedFile(
  partitionValues = InternalRow.empty,//This should be empty for non partitioned values
  filePath = path.toString,
  start = 0,
  length = status.getLen
)

val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
  fileFormat.buildReaderWithPartitionValues(
    sparkSession = spark,
    dataSchema = dataSchema,
    partitionSchema = partitionSchema,//this should be empty for non partitioned feilds
    requiredSchema = dataSchema,
    filters = Seq.empty,
    options = Map.empty,
    hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
  )

import scala.collection.JavaConverters._

val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)

  // This doesn't work. vector mode is doing something screwy
  case b: ColumnarBatch => b.rowIterator().asScala
}).toList

println(rows)
//List([0,1,5b,2000000004,66647361])
//??this is wrong I think????

Has anyone attempted something similar?

Cheers Andrew



--
Ryan Blue
Software Engineer
Netflix