You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Rock Wang (JIRA)" <ji...@apache.org> on 2017/02/01 21:38:51 UTC

[jira] [Created] (ARROW-522) VectorLoader throws exception data schema contains list of maps.

Rock Wang created ARROW-522:
-------------------------------

             Summary: VectorLoader throws exception data schema contains list of maps.
                 Key: ARROW-522
                 URL: https://issues.apache.org/jira/browse/ARROW-522
             Project: Apache Arrow
          Issue Type: Bug
          Components: Java - Vectors
    Affects Versions: 0.1.0
            Reporter: Rock Wang
            Priority: Critical


I encountered this exception
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException: should have as many children as in the schema: found 0 expected 2
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
    at org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:91)
    at org.apache.arrow.vector.VectorLoader.loadBuffers(VectorLoader.java:95)
    at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:69)
{code}

The test code is

{code:java}
public class ArrowTest {
    public static class ByteArrayReadableSeekableByteChannel implements SeekableByteChannel {
        private byte[] byteArray;
        private int position = 0;

        public ByteArrayReadableSeekableByteChannel(byte[] byteArray) {
            if (byteArray == null) {
                throw new NullPointerException();
            }
            this.byteArray = byteArray;
        }

        @Override
        public boolean isOpen() {
            return byteArray != null;
        }

        @Override
        public void close() throws IOException {
            byteArray = null;
        }

        @Override
        public int read(final ByteBuffer dst) throws IOException {
            int remainingInBuf = byteArray.length - this.position;
            int length = Math.min(dst.remaining(), remainingInBuf);
            dst.put(this.byteArray, this.position, length);
            this.position += length;
            return length;
        }

        @Override
        public long position() throws IOException {
            return this.position;
        }

        @Override
        public SeekableByteChannel position(final long newPosition) throws IOException {
            this.position = (int) newPosition;
            return this;
        }

        @Override
        public long size() throws IOException {
            return this.byteArray.length;
        }

        @Override
        public int write(final ByteBuffer src) throws IOException {
            throw new UnsupportedOperationException("Read only");
        }

        @Override
        public SeekableByteChannel truncate(final long size) throws IOException {
            throw new UnsupportedOperationException("Read only");
        }
    }

    public static void main(String[] argv) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

        // write
        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
                BufferAllocator originalVectorAllocator = allocator
                        .newChildAllocator("child allocator", 1024, Integer.MAX_VALUE);
                MapVector parent = new MapVector("parent", originalVectorAllocator, null)
        ) {
            writeData(10, parent);
            write(parent.getChild("root"), Channels.newChannel(byteArrayOutputStream));
        }

        byte[] data = byteArrayOutputStream.toByteArray();

        // read
        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
                BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
                ArrowReader arrowReader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(data),
                        readerAllocator);
                BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
                MapVector parent = new MapVector("parent", vectorAllocator, null)
        ) {
            ArrowFooter footer = arrowReader.readFooter();
            Schema schema = footer.getSchema();

            NullableMapVector root = parent.addOrGet("root", Types.MinorType.MAP, NullableMapVector.class);

            VectorLoader vectorLoader = new VectorLoader(schema, root);

            List<ArrowBlock> recordBatches = footer.getRecordBatches();

            for (ArrowBlock rbBlock : recordBatches) {
                try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
                    vectorLoader.load(recordBatch);
                }

                readData(10, parent);
            }
        }
    }

    private static void writeData(int count, MapVector parent) throws Exception {
        BaseWriter.ComplexWriter writer = new ComplexWriterImpl("root", parent, true);
        BaseWriter.MapWriter rootWriter = writer.rootAsMap();
        IntWriter intWriter = rootWriter.integer("id");
        BaseWriter.ListWriter listWriter = rootWriter.list("list");
        BaseWriter.MapWriter mapFromList = listWriter.map();
        for (int i = 0; i < count; i++) {
            rootWriter.start();
            intWriter.setPosition(i);
            intWriter.writeInt(i);

            listWriter.setPosition(i);
            listWriter.startList();
            for (int j = 0; j < 2; j++) {
                mapFromList.start();
                mapFromList.integer("type").writeInt(j);
                mapFromList.bigInt("id").writeBigInt(j * 1000L);
                mapFromList.end();
            }
            listWriter.endList();
            rootWriter.end();
        }
        writer.setValueCount(count);
    }

    private static void readData(int count, MapVector parent) {
        BaseReader.MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
        FieldReader listReader = rootReader.reader("list");
        for (int i = 0; i < count; i++) {
            listReader.setPosition(i);
            while (listReader.next()) {
                System.out.println(i + " id " + listReader.reader().reader("id").readLong());
                System.out.println(i + " type " + listReader.reader().reader("type").readInteger());
            }
        }
    }

    private static void write(FieldVector parent, WritableByteChannel out) throws IOException {
        VectorUnloader vectorUnloader = new VectorUnloader(parent);
        Schema schema = vectorUnloader.getSchema();
        try (
                ArrowWriter arrowWriter = new ArrowWriter(out, schema);
                ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
        ) {
            arrowWriter.writeRecordBatch(recordBatch);
        }
    }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)