You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Antonio Vilches <an...@shapelets.io> on 2019/02/04 14:41:10 UTC
Issue while loading an arrow file with dictionaries
Hi there,
I´m a newbie arrow user and I want to develop an application which loads/stores information from/to arrow. For this purpose, I started a very simple Kotlin application that first writes an arrow file and later reads the information
from that arrow file. It makes use of the Arrow Java API (See code below).
This application sets an schema with 3 fields: 1 Float, 1 Int and 1 Varchar. Later, it writes 3 batches of ten elements (in Section Writer). In the last section (called Reader), It tries to load the information that was stored in the Arrow file.
The application works properly if I avoid the usage of dictionaries for storing and loading Strings in Utf-8 format. However, If I use dictionaries I get an exception (see below) while trying to load the first batch by using the ArrowFileReader.loadRecordsBatch() function (Line annoted with //ERROR HERE tag.).
Exception in thread "main" java.lang.IllegalArgumentException: not all nodes and buffers were consumed. nodes: [] buffers: [ArrowBuf[52], udle: [34 448..458]]
at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
at org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
at ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)
I´m not sure if I´m doing something wrong or it is just an issue, could anyone assist me with this issue?
Thanks in advance,
Antonio Vilches.
fun main(args: Array<String>) {
println("Writing and reading an apache arrow file")
val numRecords = (10 * 3) // 30 in total
val initialBatchSize = 10
val recordsPerBatch = initialBatchSize
val numBatches = if ((numRecords % recordsPerBatch) == 0){
numRecords / recordsPerBatch
}else{
numRecords / recordsPerBatch + 1
}
println("Reading and writing $numRecords records in $numBatches batches.")
// *****************************************************************************************************************
// Generate dictionary *
// *****************************************************************************************************************
val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
vector.setInitialCapacity(4)
vector.allocateNew()
vector.set(0, "a".toByteArray(Charsets.UTF_8))
vector.set(1, "b".toByteArray(Charsets.UTF_8))
vector.set(2, "c".toByteArray(Charsets.UTF_8))
vector.set(3, "d".toByteArray(Charsets.UTF_8))
vector.valueCount = 4
val dictionary = Dictionary(vector, DictionaryEncoding("Tags".hashCode().toLong(), false, null))
dictionaryProvider.put(dictionary)
// *****************************************************************************************************************
// Create schema *
// *****************************************************************************************************************
val filePath = "./example.arrow"
//Create Fields
val temperature = Field("temperature", FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null)
val humidity = Field("humidity", FieldType.nullable(ArrowType.Int(32, true)), null)
val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/ dic.encoding, null), null)
//Create schema
val builder = mutableListOf<Field>()
builder.add(temperature)
builder.add(humidity)
builder.add(tag)
val schema = Schema(builder, null)
// *****************************************************************************************************************
// Write to Arrow *
// *****************************************************************************************************************
val fileToWrite = File(filePath)
val writeStream = fileToWrite.outputStream()
val schemaRoot = VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))
val writer = ArrowFileWriter(schemaRoot, /*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider, writeStream.channel)
writer.start()
schemaRoot.rowCount = recordsPerBatch
for (batch in 0 until numBatches){
val numItemsBatch = min(recordsPerBatch, numRecords - batch * recordsPerBatch)
schemaRoot.rowCount = numItemsBatch
schemaRoot.schema.fields.forEach {
when(it.type){
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
val vector = schemaRoot.getVector(it.name) as Float8Vector
vector.setInitialCapacity(recordsPerBatch)
vector.allocateNew(recordsPerBatch)
for (i in 0 until numItemsBatch){
vector.set(i, i.toDouble())
}
vector.valueCount = numItemsBatch
}
ArrowType.Int(32, true) -> {
val vector = schemaRoot.getVector(it.name) as IntVector
vector.setInitialCapacity(recordsPerBatch)
vector.allocateNew(recordsPerBatch)
for (i in 0 until numItemsBatch){
vector.set(i, i.toInt())
}
vector.valueCount = numItemsBatch
}
ArrowType.Utf8() -> {
val vec = schemaRoot.getVector(it.name) as VarCharVector
vec.setInitialCapacity(recordsPerBatch)
vec.allocateNew()
//val encoded = DictionaryEncoder.encode(vec, dictionaryProvider.lookup("Tags".hashCode().toLong()))
for (i in 0 until recordsPerBatch) {
when(i % 4){
0 -> { vec.set(i, "a".toByteArray(Charsets.UTF_8)) }
1 -> { vec.set(i, "b".toByteArray(Charsets.UTF_8)) }
2 -> { vec.set(i, "c".toByteArray(Charsets.UTF_8)) }
3 -> { vec.set(i, "d".toByteArray(Charsets.UTF_8)) }
}
}
vec.valueCount = recordsPerBatch
}
}
}
writer.writeBatch()
}
writer.end()
writer.close()
// Need to close dictionary vectors
for (id in dictionaryProvider.dictionaryIds) {
dictionaryProvider.lookup(id).vector.close()
}
// *****************************************************************************************************************
// Read from Arrow *
// *****************************************************************************************************************
//Accum results
var accumDouble = 0.0
var accumInt = 0L
// Setting reading
val fileToRead = File(filePath)
val readStream = fileToRead.inputStream()
val reader = ArrowFileReader(readStream.channel, RootAllocator(Long.MAX_VALUE))
//println("Reading the arrow file : $filePath")
val readRoot = reader.vectorSchemaRoot
//val readSchema = readRoot.schema
val arrowBlocks = reader.recordBlocks
reader.recordBlocks.forEachIndexed { index, arrowBlock ->
reader.loadRecordBatch(arrowBlock) // ERROR HERE
println("Reading Block[$index]: ${readRoot.rowCount} elements.")
readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
val minorType = fieldVector.minorType
when(minorType){
Types.MinorType.FLOAT8 -> {
val vecDouble = fieldVector as Float8Vector
val cap = vecDouble.valueCapacity
var address = vecDouble.dataBufferAddress
for (i in 0 until vecDouble.valueCapacity){
accumDouble += vecDouble.get(i)
}
}
Types.MinorType.INT -> {
val vecDouble = fieldVector as IntVector
val cap = vecDouble.valueCapacity
var address = vecDouble.dataBufferAddress
for (i in 0 until vecDouble.valueCapacity){
accumInt += vecDouble.get(i)
}
}
Types.MinorType.VARCHAR -> {
val vec = fieldVector as VarCharVector
val cap = vec.valueCapacity
var address = vec.dataBufferAddress
for (i in 0 until vec.valueCapacity){
println("Reading tag[$i] ${vec.get(i).toString(Charsets.UTF_8)}")
}
}
}
}
}
reader.close()
println("Double accum: $accumDouble")
println("Long accum: $accumInt")
}
/**
* This is a helper function to create VarCharVector for dictionary purposes.
*/
fun newVarCharVector(name: String, allocator: BufferAllocator): VarCharVector {
return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name, allocator, null) as VarCharVector
Re: Issue while loading an arrow file with dictionaries
Posted by Li Jin <ic...@gmail.com>.
Hi Antonio,
My memory is a little rusty now but if I remember correctly, when writing
to a dictionary encoded vector, the value should be of encoded value
instead of decoded, in your code:
//
*****************************************************************************************************************
// Write to Arrow
*
//
*****************************************************************************************************************
ArrowType.Utf8() -> {
val vec = schemaRoot.getVector(it.name) as VarCharVector
vec.setInitialCapacity(recordsPerBatch)
vec.allocateNew()
//val encoded = DictionaryEncoder.encode(vec,
dictionaryProvider.lookup("Tags".hashCode().toLong()))
for (i in 0 until recordsPerBatch) {
when(i % 4){
0 -> { vec.set(i,
"a".toByteArray(Charsets.UTF_8)) }
1 -> { vec.set(i,
"b".toByteArray(Charsets.UTF_8)) }
2 -> { vec.set(i,
"c".toByteArray(Charsets.UTF_8)) }
3 -> { vec.set(i,
"d".toByteArray(Charsets.UTF_8)) }
}
}
vec.valueCount = recordsPerBatch
}
}
Is writing the decoded value (UTF-8) however it should be writing the
encoded value.
However, is there an issue that:
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))
will create an decoded type vector instead of encoded, so you would
probably need to create vectors yourself instead of using
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE)).
For more details please see:
https://github.com/apache/arrow/pull/2681
On Mon, Feb 4, 2019 at 2:07 PM Wes McKinney <we...@gmail.com> wrote:
> not sure if you're subscribed to user@ but in case you have advice
>
> ---------- Forwarded message ---------
> From: Antonio Vilches <an...@shapelets.io>
> Date: Mon, Feb 4, 2019 at 8:41 AM
> Subject: Issue while loading an arrow file with dictionaries
> To: user@arrow.apache.org <us...@arrow.apache.org>
>
>
> Hi there,
>
>
>
> I´m a newbie arrow user and I want to develop an application which
> loads/stores information from/to arrow. For this purpose, I started a
> very simple Kotlin application that first writes an arrow file and
> later reads the information
>
> from that arrow file. It makes use of the Arrow Java API (See code below).
>
> This application sets an schema with 3 fields: 1 Float, 1 Int and 1
> Varchar. Later, it writes 3 batches of ten elements (in Section
> Writer). In the last section (called Reader), It tries to load the
> information that was stored in the Arrow file.
>
>
>
> The application works properly if I avoid the usage of dictionaries
> for storing and loading Strings in Utf-8 format. However, If I use
> dictionaries I get an exception (see below) while trying to load the
> first batch by using the ArrowFileReader.loadRecordsBatch() function
> (Line annoted with //ERROR HERE tag.).
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException: not all
> nodes and buffers were consumed. nodes: [] buffers: [ArrowBuf[52],
> udle: [34 448..458]]
>
> at
> org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
>
> at
>
> org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
>
> at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
>
> at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
>
> at
>
> ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)
>
>
>
> I´m not sure if I´m doing something wrong or it is just an issue,
> could anyone assist me with this issue?
>
> Thanks in advance,
>
> Antonio Vilches.
>
>
>
>
>
>
>
> fun main(args: Array<String>) {
>
> println("Writing and reading an apache arrow file")
> val numRecords = (10 * 3) // 30 in total
> val initialBatchSize = 10
>
>
> val recordsPerBatch = initialBatchSize
>
> val numBatches = if ((numRecords % recordsPerBatch) == 0){
> numRecords / recordsPerBatch
> }else{
> numRecords / recordsPerBatch + 1
> }
>
> println("Reading and writing $numRecords records in $numBatches
> batches.")
> //
> *****************************************************************************************************************
> // Generate dictionary
> *
> //
> *****************************************************************************************************************
> val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
> val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
> vector.setInitialCapacity(4)
> vector.allocateNew()
>
> vector.set(0, "a".toByteArray(Charsets.UTF_8))
> vector.set(1, "b".toByteArray(Charsets.UTF_8))
> vector.set(2, "c".toByteArray(Charsets.UTF_8))
> vector.set(3, "d".toByteArray(Charsets.UTF_8))
>
> vector.valueCount = 4
> val dictionary = Dictionary(vector,
> DictionaryEncoding("Tags".hashCode().toLong(), false, null))
> dictionaryProvider.put(dictionary)
>
> //
> *****************************************************************************************************************
> // Create schema
> *
> //
> *****************************************************************************************************************
> val filePath = "./example.arrow"
>
> //Create Fields
> val temperature = Field("temperature",
> FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
> null)
> val humidity = Field("humidity",
> FieldType.nullable(ArrowType.Int(32, true)), null)
> val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
> val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/
> dic.encoding, null), null)
>
> //Create schema
> val builder = mutableListOf<Field>()
> builder.add(temperature)
> builder.add(humidity)
> builder.add(tag)
> val schema = Schema(builder, null)
>
> //
> *****************************************************************************************************************
> // Write to Arrow
> *
> //
> *****************************************************************************************************************
>
> val fileToWrite = File(filePath)
> val writeStream = fileToWrite.outputStream()
> val schemaRoot = VectorSchemaRoot.create(schema,
> RootAllocator(Long.MAX_VALUE))
> val writer = ArrowFileWriter(schemaRoot,
> /*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider,
> writeStream.channel)
>
> writer.start()
>
> schemaRoot.rowCount = recordsPerBatch
> for (batch in 0 until numBatches){
> val numItemsBatch = min(recordsPerBatch, numRecords - batch *
> recordsPerBatch)
> schemaRoot.rowCount = numItemsBatch
> schemaRoot.schema.fields.forEach {
> when(it.type){
> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
> val vector = schemaRoot.getVector(it.name) as
> Float8Vector
> vector.setInitialCapacity(recordsPerBatch)
> vector.allocateNew(recordsPerBatch)
> for (i in 0 until numItemsBatch){
> vector.set(i, i.toDouble())
> }
> vector.valueCount = numItemsBatch
> }
> ArrowType.Int(32, true) -> {
> val vector = schemaRoot.getVector(it.name) as
> IntVector
> vector.setInitialCapacity(recordsPerBatch)
> vector.allocateNew(recordsPerBatch)
> for (i in 0 until numItemsBatch){
> vector.set(i, i.toInt())
> }
> vector.valueCount = numItemsBatch
> }
> ArrowType.Utf8() -> {
> val vec = schemaRoot.getVector(it.name) as
> VarCharVector
> vec.setInitialCapacity(recordsPerBatch)
> vec.allocateNew()
>
> //val encoded = DictionaryEncoder.encode(vec,
> dictionaryProvider.lookup("Tags".hashCode().toLong()))
>
> for (i in 0 until recordsPerBatch) {
> when(i % 4){
> 0 -> { vec.set(i,
> "a".toByteArray(Charsets.UTF_8)) }
> 1 -> { vec.set(i,
> "b".toByteArray(Charsets.UTF_8)) }
> 2 -> { vec.set(i,
> "c".toByteArray(Charsets.UTF_8)) }
> 3 -> { vec.set(i,
> "d".toByteArray(Charsets.UTF_8)) }
> }
> }
> vec.valueCount = recordsPerBatch
> }
> }
> }
> writer.writeBatch()
> }
> writer.end()
> writer.close()
>
> // Need to close dictionary vectors
> for (id in dictionaryProvider.dictionaryIds) {
> dictionaryProvider.lookup(id).vector.close()
> }
>
> //
> *****************************************************************************************************************
> // Read from Arrow
> *
> //
> *****************************************************************************************************************
>
> //Accum results
> var accumDouble = 0.0
> var accumInt = 0L
>
> // Setting reading
> val fileToRead = File(filePath)
> val readStream = fileToRead.inputStream()
> val reader = ArrowFileReader(readStream.channel,
> RootAllocator(Long.MAX_VALUE))
>
> //println("Reading the arrow file : $filePath")
> val readRoot = reader.vectorSchemaRoot
> //val readSchema = readRoot.schema
>
>
>
> val arrowBlocks = reader.recordBlocks
>
> reader.recordBlocks.forEachIndexed { index, arrowBlock ->
> reader.loadRecordBatch(arrowBlock) // ERROR HERE
>
> println("Reading Block[$index]: ${readRoot.rowCount} elements.")
>
> readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
> val minorType = fieldVector.minorType
> when(minorType){
> Types.MinorType.FLOAT8 -> {
> val vecDouble = fieldVector as Float8Vector
> val cap = vecDouble.valueCapacity
> var address = vecDouble.dataBufferAddress
> for (i in 0 until vecDouble.valueCapacity){
> accumDouble += vecDouble.get(i)
> }
> }
> Types.MinorType.INT -> {
> val vecDouble = fieldVector as IntVector
> val cap = vecDouble.valueCapacity
> var address = vecDouble.dataBufferAddress
> for (i in 0 until vecDouble.valueCapacity){
> accumInt += vecDouble.get(i)
> }
> }
> Types.MinorType.VARCHAR -> {
> val vec = fieldVector as VarCharVector
> val cap = vec.valueCapacity
> var address = vec.dataBufferAddress
> for (i in 0 until vec.valueCapacity){
> println("Reading tag[$i]
> ${vec.get(i).toString(Charsets.UTF_8)}")
> }
> }
> }
> }
> }
>
> reader.close()
>
> println("Double accum: $accumDouble")
> println("Long accum: $accumInt")
>
> }
>
> /**
> * This is a helper function to create VarCharVector for dictionary
> purposes.
> */
> fun newVarCharVector(name: String, allocator: BufferAllocator):
> VarCharVector {
> return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name,
> allocator, null) as VarCharVector
>