You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Liya Fan (JIRA)" <ji...@apache.org> on 2019/04/29 06:03:00 UTC

[jira] [Commented] (ARROW-5230) [Java] Read Struct Vector from ArrowStreamReader bugs

    [ https://issues.apache.org/jira/browse/ARROW-5230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828932#comment-16828932 ] 

Liya Fan commented on ARROW-5230:
---------------------------------

Hi [~Chaokun Yang], this seems like an interesting problem. If you agree, I'd like to take a look.

> [Java] Read Struct Vector from ArrowStreamReader bugs
> -----------------------------------------------------
>
>                 Key: ARROW-5230
>                 URL: https://issues.apache.org/jira/browse/ARROW-5230
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Java
>         Environment: Mac OS 10.13.6, Arrow 0.13.0, JDK8
>            Reporter: Chaokun Yang
>            Priority: Major
>
> After writing struct vector using ArrowStreamWriter to a file, read it back using ArrowStreamReader throws exception:
> {quote}Exception in thread "main" java.lang.IllegalArgumentException: not all nodes and buffers were consumed. nodes: [ArrowFieldNode [length=100, nullCount=0], ArrowFieldNode [length=100, nullCount=0]] buffers: [ArrowBuf[26], udle: [11 16..29], ArrowBuf[27], udle: [11 32..432], ArrowBuf[28], udle: [11 432..445], ArrowBuf[29], udle: [11 448..848]]
>  at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:64)
>  at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:219)
>  at org.apache.arrow.vector.ipc.ArrowStreamReader.loadNextBatch(ArrowStreamReader.java:121)
> {quote}
> Here's the code to reproduce this exception:
> {code:java}
> import org.apache.arrow.memory.RootAllocator;
> import org.apache.arrow.vector.FieldVector;
> import org.apache.arrow.vector.IntVector;
> import org.apache.arrow.vector.VectorSchemaRoot;
> import org.apache.arrow.vector.complex.StructVector;
> import org.apache.arrow.vector.dictionary.DictionaryProvider;
> import org.apache.arrow.vector.ipc.ArrowStreamReader;
> import org.apache.arrow.vector.ipc.ArrowStreamWriter;
> import org.apache.arrow.vector.types.pojo.ArrowType;
> import org.apache.arrow.vector.types.pojo.Field;
> import org.apache.arrow.vector.types.pojo.FieldType;
> import org.apache.arrow.vector.types.pojo.Schema;
> import java.io.ByteArrayInputStream;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.OutputStream;
> import java.nio.file.Files;
> import java.nio.file.Paths;
> import java.util.Collections;
> import java.util.List;
> import java.util.concurrent.ThreadLocalRandom;
> public class StructTest {
>     public static void writeBatch(OutputStream os) throws IOException {
>         List<Field> fields = Collections.singletonList(new Field("f-Struct(Int, Int)", FieldType.nullable(ArrowType.Struct.INSTANCE), null));
>         Schema schema = new Schema(fields);
>         VectorSchemaRoot root = VectorSchemaRoot.create(schema, new RootAllocator(Integer.MAX_VALUE));
>         DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
>         ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, os);
>         writer.start();
>         for (int i = 0; i < 2; i++) {
>             root.setRowCount(100);
>             List<FieldVector> vectors = root.getFieldVectors();
>             StructVector vector = (StructVector) vectors.get(0);
>             fillVector(vector, 100);
>             for (int j = 0; j < 100; j++) {
>                 if (!vector.isNull(j)) {
>                     System.out.println(vector.getObject(j));
>                 }
>             }
>             writer.writeBatch();
>         }
>         writer.end();
>         writer.close();
>     }
>     public static void fillVector(StructVector vector, int batchSize) {
>         vector.setInitialCapacity(batchSize);
>         vector.allocateNew();
>         vector.addOrGet("s1", FieldType.nullable(new ArrowType.Int(32, true)), IntVector.class);
>         vector.addOrGet("s2", FieldType.nullable(new ArrowType.Int(32, true)), IntVector.class);
>         fillVector((IntVector)(vector.getChild("s1")), batchSize);
>         fillVector((IntVector) (vector.getChild("s2")), batchSize);
>         for (int i = 0; i < batchSize; i++) {
>             vector.setIndexDefined(i);
>         }
>         vector.setValueCount(batchSize);
>     }
>     public static void fillVector(IntVector vector, int batchSize) {
>         vector.setInitialCapacity(batchSize);
>         vector.allocateNew();
>         for (int i = 0; i < batchSize; i++) {
>             vector.setSafe(i, 1, ThreadLocalRandom.current().nextInt());
>         }
>         vector.setValueCount(batchSize);
>     }
>     public static void main(String[] args) throws IOException {
>         try (FileOutputStream fos = new FileOutputStream("result/struct.arrow")) {
>             writeBatch(fos);
>             System.out.println("write succeed");
>             fos.flush();
>         }
>         RootAllocator allocator = new RootAllocator(1000000000);
>         ByteArrayInputStream in = new ByteArrayInputStream(Files.readAllBytes(Paths.get("result/struct.arrow")));
>         ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
>         reader.loadNextBatch();
>     }
> }
> {code}
>  If I make struct record batches in python, java can  read it back:
> Write data:
>  
> {code:java}
> def make_struct(path, batch_size=200, num_batch=2):
>     obj = get_struct_obj(batch_size)
>     batch = pa.RecordBatch.from_arrays([obj], ['fo'])
>     writer = pa.RecordBatchStreamWriter(path, batch.schema)
>     for _ in range(num_batch):
>         writer.write_batch(batch)
>     writer.close()
> make_struct("struct.arrow")
> {code}
> Read back:
> {code:java}
> RootAllocator allocator = new RootAllocator(1000000000);
> ByteArrayInputStream in = new ByteArrayInputStream(Files.readAllBytes(Paths.get("struct.arrow")));
> ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
> reader.loadNextBatch();
> {code}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)