You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Antonio Vilches <an...@shapelets.io> on 2019/02/04 14:41:10 UTC

Issue while loading an arrow file with dictionaries

Hi there,

I´m a newbie arrow user and I want to develop an application which loads/stores information from/to arrow. For this purpose, I started a very simple Kotlin application that first writes an arrow file and later reads the information
from that arrow file. It makes use of the Arrow Java API (See code below).
This application sets an schema with 3 fields: 1 Float, 1 Int and 1 Varchar. Later, it writes 3 batches of ten elements (in Section Writer). In the last section (called Reader), It tries to load the information that was stored in the Arrow file.

The application works properly if I avoid the usage of dictionaries for storing and loading Strings in Utf-8 format. However, If I use dictionaries I get an exception (see below) while trying to load the first batch by using the ArrowFileReader.loadRecordsBatch() function (Line annoted with //ERROR HERE tag.).

Exception in thread "main" java.lang.IllegalArgumentException: not all nodes and buffers were consumed. nodes: [] buffers: [ArrowBuf[52], udle: [34 448..458]]
                at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
                at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
                at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
                at org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
                at ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)

I´m not sure if I´m doing something wrong or it is just an issue, could anyone assist me with this issue?
Thanks in advance,
Antonio Vilches.




fun main(args: Array<String>) {

    println("Writing and reading an apache arrow file")
    val numRecords = (10 * 3) // 30 in total
    val initialBatchSize = 10


    val recordsPerBatch = initialBatchSize

    val numBatches  = if ((numRecords % recordsPerBatch) == 0){
        numRecords / recordsPerBatch
    }else{
        numRecords / recordsPerBatch + 1
    }

    println("Reading and writing $numRecords records in $numBatches batches.")
    // *****************************************************************************************************************
    // Generate dictionary                                                                                             *
    // *****************************************************************************************************************
    val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
    val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
    vector.setInitialCapacity(4)
    vector.allocateNew()

    vector.set(0, "a".toByteArray(Charsets.UTF_8))
    vector.set(1, "b".toByteArray(Charsets.UTF_8))
    vector.set(2, "c".toByteArray(Charsets.UTF_8))
    vector.set(3, "d".toByteArray(Charsets.UTF_8))

    vector.valueCount = 4
    val dictionary = Dictionary(vector, DictionaryEncoding("Tags".hashCode().toLong(), false, null))
    dictionaryProvider.put(dictionary)

    // *****************************************************************************************************************
    // Create schema                                                                                                   *
    // *****************************************************************************************************************
    val filePath = "./example.arrow"

    //Create Fields
    val temperature = Field("temperature", FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null)
    val humidity = Field("humidity", FieldType.nullable(ArrowType.Int(32, true)), null)
    val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
    val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/ dic.encoding, null), null)

    //Create schema
    val builder = mutableListOf<Field>()
    builder.add(temperature)
    builder.add(humidity)
    builder.add(tag)
    val schema = Schema(builder, null)

    // *****************************************************************************************************************
    // Write to Arrow                                                                                                  *
    // *****************************************************************************************************************

    val fileToWrite = File(filePath)
    val writeStream = fileToWrite.outputStream()
    val schemaRoot = VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))
    val writer = ArrowFileWriter(schemaRoot, /*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider, writeStream.channel)

    writer.start()

    schemaRoot.rowCount = recordsPerBatch
    for (batch in 0 until numBatches){
        val numItemsBatch = min(recordsPerBatch, numRecords - batch * recordsPerBatch)
        schemaRoot.rowCount = numItemsBatch
        schemaRoot.schema.fields.forEach {
            when(it.type){
                ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
                    val vector = schemaRoot.getVector(it.name) as Float8Vector
                    vector.setInitialCapacity(recordsPerBatch)
                    vector.allocateNew(recordsPerBatch)
                    for (i in 0 until numItemsBatch){
                        vector.set(i, i.toDouble())
                    }
                    vector.valueCount = numItemsBatch
                }
                ArrowType.Int(32, true) -> {
                    val vector = schemaRoot.getVector(it.name) as IntVector
                    vector.setInitialCapacity(recordsPerBatch)
                    vector.allocateNew(recordsPerBatch)
                    for (i in 0 until numItemsBatch){
                        vector.set(i, i.toInt())
                    }
                    vector.valueCount = numItemsBatch
                }
                ArrowType.Utf8() -> {
                    val vec = schemaRoot.getVector(it.name) as VarCharVector
                    vec.setInitialCapacity(recordsPerBatch)
                    vec.allocateNew()

                    //val encoded = DictionaryEncoder.encode(vec, dictionaryProvider.lookup("Tags".hashCode().toLong()))

                    for (i in 0 until recordsPerBatch) {
                        when(i % 4){
                            0 -> { vec.set(i, "a".toByteArray(Charsets.UTF_8)) }
                            1 -> { vec.set(i, "b".toByteArray(Charsets.UTF_8)) }
                            2 -> { vec.set(i, "c".toByteArray(Charsets.UTF_8)) }
                            3 -> { vec.set(i, "d".toByteArray(Charsets.UTF_8)) }
                        }
                    }
                    vec.valueCount = recordsPerBatch
                }
            }
        }
        writer.writeBatch()
    }
    writer.end()
    writer.close()

    // Need to close dictionary vectors
    for (id in dictionaryProvider.dictionaryIds) {
        dictionaryProvider.lookup(id).vector.close()
    }

    // *****************************************************************************************************************
    // Read from Arrow                                                                                                 *
    // *****************************************************************************************************************

    //Accum results
    var accumDouble = 0.0
    var accumInt = 0L

    // Setting reading
    val fileToRead = File(filePath)
    val readStream = fileToRead.inputStream()
    val reader = ArrowFileReader(readStream.channel, RootAllocator(Long.MAX_VALUE))

    //println("Reading the arrow file : $filePath")
    val readRoot = reader.vectorSchemaRoot
    //val readSchema = readRoot.schema



    val arrowBlocks = reader.recordBlocks

    reader.recordBlocks.forEachIndexed { index, arrowBlock ->
        reader.loadRecordBatch(arrowBlock)  // ERROR HERE

        println("Reading Block[$index]: ${readRoot.rowCount} elements.")

        readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
            val minorType = fieldVector.minorType
            when(minorType){
                Types.MinorType.FLOAT8 -> {
                    val vecDouble = fieldVector as Float8Vector
                    val cap = vecDouble.valueCapacity
                    var address = vecDouble.dataBufferAddress
                    for (i in 0 until vecDouble.valueCapacity){
                        accumDouble += vecDouble.get(i)
                    }
                }
                Types.MinorType.INT -> {
                    val vecDouble = fieldVector as IntVector
                    val cap = vecDouble.valueCapacity
                    var address = vecDouble.dataBufferAddress
                    for (i in 0 until vecDouble.valueCapacity){
                        accumInt += vecDouble.get(i)
                    }
                }
                Types.MinorType.VARCHAR -> {
                    val vec = fieldVector as VarCharVector
                    val cap = vec.valueCapacity
                    var address = vec.dataBufferAddress
                    for (i in 0 until vec.valueCapacity){
                        println("Reading tag[$i] ${vec.get(i).toString(Charsets.UTF_8)}")
                    }
                }
            }
        }
    }

    reader.close()

    println("Double accum: $accumDouble")
    println("Long accum: $accumInt")

}

/**
 * This is a helper function to create VarCharVector for dictionary purposes.
 */
fun newVarCharVector(name: String, allocator: BufferAllocator): VarCharVector {
    return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name, allocator, null) as VarCharVector


Re: Issue while loading an arrow file with dictionaries

Posted by Li Jin <ic...@gmail.com>.
Hi Antonio,

My memory is a little rusty now but if I remember correctly, when writing
to a dictionary encoded vector, the value should be of encoded value
instead of decoded, in your code:


    //
*****************************************************************************************************************
    // Write to Arrow
                                                *
    //
*****************************************************************************************************************

 ArrowType.Utf8() -> {
                    val vec = schemaRoot.getVector(it.name) as VarCharVector
                    vec.setInitialCapacity(recordsPerBatch)
                    vec.allocateNew()

                    //val encoded = DictionaryEncoder.encode(vec,
dictionaryProvider.lookup("Tags".hashCode().toLong()))

                    for (i in 0 until recordsPerBatch) {
                        when(i % 4){
                            0 -> { vec.set(i,
"a".toByteArray(Charsets.UTF_8)) }
                            1 -> { vec.set(i,
"b".toByteArray(Charsets.UTF_8)) }
                            2 -> { vec.set(i,
"c".toByteArray(Charsets.UTF_8)) }
                            3 -> { vec.set(i,
"d".toByteArray(Charsets.UTF_8)) }
                        }
                    }
                    vec.valueCount = recordsPerBatch
                }
}

Is writing the decoded value (UTF-8) however it should be writing the
encoded value.

However, is there an issue that:
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE))

will create an decoded type vector instead of encoded, so you would
probably need to create vectors yourself instead of using
VectorSchemaRoot.create(schema, RootAllocator(Long.MAX_VALUE)).

For more details please see:
https://github.com/apache/arrow/pull/2681



On Mon, Feb 4, 2019 at 2:07 PM Wes McKinney <we...@gmail.com> wrote:

> not sure if you're subscribed to user@ but in case you have advice
>
> ---------- Forwarded message ---------
> From: Antonio Vilches <an...@shapelets.io>
> Date: Mon, Feb 4, 2019 at 8:41 AM
> Subject: Issue while loading an arrow file with dictionaries
> To: user@arrow.apache.org <us...@arrow.apache.org>
>
>
> Hi there,
>
>
>
> I´m a newbie arrow user and I want to develop an application which
> loads/stores information from/to arrow. For this purpose, I started a
> very simple Kotlin application that first writes an arrow file and
> later reads the information
>
> from that arrow file. It makes use of the Arrow Java API (See code below).
>
> This application sets an schema with 3 fields: 1 Float, 1 Int and 1
> Varchar. Later, it writes 3 batches of ten elements (in Section
> Writer). In the last section (called Reader), It tries to load the
> information that was stored in the Arrow file.
>
>
>
> The application works properly if I avoid the usage of dictionaries
> for storing and loading Strings in Utf-8 format. However, If I use
> dictionaries I get an exception (see below) while trying to load the
> first batch by using the ArrowFileReader.loadRecordsBatch() function
> (Line annoted with //ERROR HERE tag.).
>
>
>
> Exception in thread "main" java.lang.IllegalArgumentException: not all
> nodes and buffers were consumed. nodes: [] buffers: [ArrowBuf[52],
> udle: [34 448..458]]
>
>                 at
> org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:65)
>
>                 at
>
> org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
>
>                 at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:114)
>
>                 at
>
> org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:139)
>
>                 at
>
> ArrowFileSymbolicalWithDictionaryKt.main(ArrowFileSymbolicalWithDictionary.kt:157)
>
>
>
> I´m not sure if I´m doing something wrong or it is just an issue,
> could anyone assist me with this issue?
>
> Thanks in advance,
>
> Antonio Vilches.
>
>
>
>
>
>
>
> fun main(args: Array<String>) {
>
>     println("Writing and reading an apache arrow file")
>     val numRecords = (10 * 3) // 30 in total
>     val initialBatchSize = 10
>
>
>     val recordsPerBatch = initialBatchSize
>
>     val numBatches  = if ((numRecords % recordsPerBatch) == 0){
>         numRecords / recordsPerBatch
>     }else{
>         numRecords / recordsPerBatch + 1
>     }
>
>     println("Reading and writing $numRecords records in $numBatches
> batches.")
>     //
> *****************************************************************************************************************
>     // Generate dictionary
>                                                 *
>     //
> *****************************************************************************************************************
>     val dictionaryProvider = DictionaryProvider.MapDictionaryProvider()
>     val vector = newVarCharVector("Tags", RootAllocator(Long.MAX_VALUE))
>     vector.setInitialCapacity(4)
>     vector.allocateNew()
>
>     vector.set(0, "a".toByteArray(Charsets.UTF_8))
>     vector.set(1, "b".toByteArray(Charsets.UTF_8))
>     vector.set(2, "c".toByteArray(Charsets.UTF_8))
>     vector.set(3, "d".toByteArray(Charsets.UTF_8))
>
>     vector.valueCount = 4
>     val dictionary = Dictionary(vector,
> DictionaryEncoding("Tags".hashCode().toLong(), false, null))
>     dictionaryProvider.put(dictionary)
>
>     //
> *****************************************************************************************************************
>     // Create schema
>                                                 *
>     //
> *****************************************************************************************************************
>     val filePath = "./example.arrow"
>
>     //Create Fields
>     val temperature = Field("temperature",
> FieldType.nullable(ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)),
> null)
>     val humidity = Field("humidity",
> FieldType.nullable(ArrowType.Int(32, true)), null)
>     val dic = dictionaryProvider.lookup("Tags".hashCode().toLong())
>     val tag = Field("tag", FieldType(true, ArrowType.Utf8(), /*null*/
> dic.encoding, null), null)
>
>     //Create schema
>     val builder = mutableListOf<Field>()
>     builder.add(temperature)
>     builder.add(humidity)
>     builder.add(tag)
>     val schema = Schema(builder, null)
>
>     //
> *****************************************************************************************************************
>     // Write to Arrow
>                                                 *
>     //
> *****************************************************************************************************************
>
>     val fileToWrite = File(filePath)
>     val writeStream = fileToWrite.outputStream()
>     val schemaRoot = VectorSchemaRoot.create(schema,
> RootAllocator(Long.MAX_VALUE))
>     val writer = ArrowFileWriter(schemaRoot,
> /*DictionaryProvider.MapDictionaryProvider()*/ dictionaryProvider,
> writeStream.channel)
>
>     writer.start()
>
>     schemaRoot.rowCount = recordsPerBatch
>     for (batch in 0 until numBatches){
>         val numItemsBatch = min(recordsPerBatch, numRecords - batch *
> recordsPerBatch)
>         schemaRoot.rowCount = numItemsBatch
>         schemaRoot.schema.fields.forEach {
>             when(it.type){
>                 ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) -> {
>                     val vector = schemaRoot.getVector(it.name) as
> Float8Vector
>                     vector.setInitialCapacity(recordsPerBatch)
>                     vector.allocateNew(recordsPerBatch)
>                     for (i in 0 until numItemsBatch){
>                         vector.set(i, i.toDouble())
>                     }
>                     vector.valueCount = numItemsBatch
>                 }
>                 ArrowType.Int(32, true) -> {
>                     val vector = schemaRoot.getVector(it.name) as
> IntVector
>                     vector.setInitialCapacity(recordsPerBatch)
>                     vector.allocateNew(recordsPerBatch)
>                     for (i in 0 until numItemsBatch){
>                         vector.set(i, i.toInt())
>                     }
>                     vector.valueCount = numItemsBatch
>                 }
>                 ArrowType.Utf8() -> {
>                     val vec = schemaRoot.getVector(it.name) as
> VarCharVector
>                     vec.setInitialCapacity(recordsPerBatch)
>                     vec.allocateNew()
>
>                     //val encoded = DictionaryEncoder.encode(vec,
> dictionaryProvider.lookup("Tags".hashCode().toLong()))
>
>                     for (i in 0 until recordsPerBatch) {
>                         when(i % 4){
>                             0 -> { vec.set(i,
> "a".toByteArray(Charsets.UTF_8)) }
>                             1 -> { vec.set(i,
> "b".toByteArray(Charsets.UTF_8)) }
>                             2 -> { vec.set(i,
> "c".toByteArray(Charsets.UTF_8)) }
>                             3 -> { vec.set(i,
> "d".toByteArray(Charsets.UTF_8)) }
>                         }
>                     }
>                     vec.valueCount = recordsPerBatch
>                 }
>             }
>         }
>         writer.writeBatch()
>     }
>     writer.end()
>     writer.close()
>
>     // Need to close dictionary vectors
>     for (id in dictionaryProvider.dictionaryIds) {
>         dictionaryProvider.lookup(id).vector.close()
>     }
>
>     //
> *****************************************************************************************************************
>     // Read from Arrow
>                                                 *
>     //
> *****************************************************************************************************************
>
>     //Accum results
>     var accumDouble = 0.0
>     var accumInt = 0L
>
>     // Setting reading
>     val fileToRead = File(filePath)
>     val readStream = fileToRead.inputStream()
>     val reader = ArrowFileReader(readStream.channel,
> RootAllocator(Long.MAX_VALUE))
>
>     //println("Reading the arrow file : $filePath")
>     val readRoot = reader.vectorSchemaRoot
>     //val readSchema = readRoot.schema
>
>
>
>     val arrowBlocks = reader.recordBlocks
>
>     reader.recordBlocks.forEachIndexed { index, arrowBlock ->
>         reader.loadRecordBatch(arrowBlock)  // ERROR HERE
>
>         println("Reading Block[$index]: ${readRoot.rowCount} elements.")
>
>         readRoot.fieldVectors.forEachIndexed { index2, fieldVector ->
>             val minorType = fieldVector.minorType
>             when(minorType){
>                 Types.MinorType.FLOAT8 -> {
>                     val vecDouble = fieldVector as Float8Vector
>                     val cap = vecDouble.valueCapacity
>                     var address = vecDouble.dataBufferAddress
>                     for (i in 0 until vecDouble.valueCapacity){
>                         accumDouble += vecDouble.get(i)
>                     }
>                 }
>                 Types.MinorType.INT -> {
>                     val vecDouble = fieldVector as IntVector
>                     val cap = vecDouble.valueCapacity
>                     var address = vecDouble.dataBufferAddress
>                     for (i in 0 until vecDouble.valueCapacity){
>                         accumInt += vecDouble.get(i)
>                     }
>                 }
>                 Types.MinorType.VARCHAR -> {
>                     val vec = fieldVector as VarCharVector
>                     val cap = vec.valueCapacity
>                     var address = vec.dataBufferAddress
>                     for (i in 0 until vec.valueCapacity){
>                         println("Reading tag[$i]
> ${vec.get(i).toString(Charsets.UTF_8)}")
>                     }
>                 }
>             }
>         }
>     }
>
>     reader.close()
>
>     println("Double accum: $accumDouble")
>     println("Long accum: $accumInt")
>
> }
>
> /**
>  * This is a helper function to create VarCharVector for dictionary
> purposes.
>  */
> fun newVarCharVector(name: String, allocator: BufferAllocator):
> VarCharVector {
>     return FieldType.nullable(ArrowType.Utf8()).createNewSingleVector(name,
> allocator, null) as VarCharVector
>