You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ryan Blue <rb...@netflix.com.INVALID> on 2019/03/21 22:31:11 UTC
Re: Manually reading parquet files.
You're getting InternalRow instances. They probably have the data you want,
but the toString representation doesn't match the data for InternalRow.
On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid>
wrote:
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues. This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
> //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
> *TestRow*(1,1,"asdf"),
> *TestRow*(2,1,"asdf"),
> *TestRow*(3,1,"asdf"),
> *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
> partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
> filePath = path.toString,
> start = 0,
> length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
> fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
>
> // This doesn't work. vector mode is doing something screwy
> *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
--
Ryan Blue
Software Engineer
Netflix
Re: Manually reading parquet files.
Posted by Wenchen Fan <cl...@gmail.com>.
Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?
On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew <lo...@amazon.com.invalid>
wrote:
> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
> *case *b: ColumnarBatch => b.rowIterator().asScala
> })
> .map(enconder.fromRow(_))
> .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"rblue@netflix.com" <rb...@netflix.com>
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" <lo...@amazon.com.invalid>
> *Cc: *"dev@spark.apache.org" <de...@spark.apache.org>, "
> user@spark.apache.org" <us...@spark.apache.org>, "horizon-dev@amazon.com" <
> horizon-dev@amazon.com>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid>
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues. This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
> //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
> *TestRow*(1,1,"asdf"),
> *TestRow*(2,1,"asdf"),
> *TestRow*(3,1,"asdf"),
> *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
> partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
> filePath = path.toString,
> start = 0,
> length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
> fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
>
> // This doesn't work. vector mode is doing something screwy
> *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
Re: Manually reading parquet files.
Posted by Wenchen Fan <cl...@gmail.com>.
Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?
On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew <lo...@amazon.com.invalid>
wrote:
> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
> *case *b: ColumnarBatch => b.rowIterator().asScala
> })
> .map(enconder.fromRow(_))
> .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue <rb...@netflix.com.INVALID>
> *Reply-To: *"rblue@netflix.com" <rb...@netflix.com>
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" <lo...@amazon.com.invalid>
> *Cc: *"dev@spark.apache.org" <de...@spark.apache.org>, "
> user@spark.apache.org" <us...@spark.apache.org>, "horizon-dev@amazon.com" <
> horizon-dev@amazon.com>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid>
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues. This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
> //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
> *TestRow*(1,1,"asdf"),
> *TestRow*(2,1,"asdf"),
> *TestRow*(3,1,"asdf"),
> *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
> partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
> filePath = path.toString,
> start = 0,
> length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
> fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
> )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
> *case *r: InternalRow => *Seq*(r)
>
> // This doesn't work. vector mode is doing something screwy
> *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,2000000004,66647361])
> //??this is wrong I think????
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
Re: Manually reading parquet files.
Posted by "Long, Andrew" <lo...@amazon.com.INVALID>.
Thanks a ton for the help!
Is there a standardized way of converting the internal row to rows?
I’ve tried this but im getting an exception
val enconder = RowEncoder(df.schema)
val rows = readFile(pFile).flatMap(_ match {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
})
.map(enconder.fromRow(_))
.toList
java.lang.RuntimeException: Error while decoding: java.lang.UnsupportedOperationException: Cannot evaluate expression: getcolumnbyordinal(0, IntegerType)
createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2, StringType).toString, StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
at com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
at com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
From: Ryan Blue <rb...@netflix.com.INVALID>
Reply-To: "rblue@netflix.com" <rb...@netflix.com>
Date: Thursday, March 21, 2019 at 3:32 PM
To: "Long, Andrew" <lo...@amazon.com.invalid>
Cc: "dev@spark.apache.org" <de...@spark.apache.org>, "user@spark.apache.org" <us...@spark.apache.org>, "horizon-dev@amazon.com" <ho...@amazon.com>
Subject: Re: Manually reading parquet files.
You're getting InternalRow instances. They probably have the data you want, but the toString representation doesn't match the data for InternalRow.
On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <lo...@amazon.com.invalid> wrote:
Hello Friends,
I’m working on a performance improvement that reads additional parquet files in the middle of a lambda and I’m running into some issues. This is what id like todo
ds.mapPartitions(x=>{
//read parquet file in and perform an operation with x
})
Here’s my current POC code but I’m getting nonsense back from the row reader.
import com.amazon.horizon.azulene.util.SparkFileUtils._
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
val data = List(
TestRow(1,1,"asdf"),
TestRow(2,1,"asdf"),
TestRow(3,1,"asdf"),
TestRow(4,1,"asdf")
)
val df = spark.createDataFrame(data)
val folder = Files.createTempDirectory("azulene-test")
val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)
val files = spark.fs.listStatus(folder.toUri)
val file = files(1)//skip _success file
val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()
val path = file.getPath
val status = spark.fs.getFileStatus(path)
val pFile = new PartitionedFile(
partitionValues = InternalRow.empty,//This should be empty for non partitioned values
filePath = path.toString,
start = 0,
length = status.getLen
)
val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
fileFormat.buildReaderWithPartitionValues(
sparkSession = spark,
dataSchema = dataSchema,
partitionSchema = partitionSchema,//this should be empty for non partitioned feilds
requiredSchema = dataSchema,
filters = Seq.empty,
options = Map.empty,
hadoopConf = spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
)
import scala.collection.JavaConverters._
val rows = readFile(pFile).flatMap(_ match {
case r: InternalRow => Seq(r)
// This doesn't work. vector mode is doing something screwy
case b: ColumnarBatch => b.rowIterator().asScala
}).toList
println(rows)
//List([0,1,5b,2000000004,66647361])
//??this is wrong I think????
Has anyone attempted something similar?
Cheers Andrew
--
Ryan Blue
Software Engineer
Netflix