You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ju...@apache.org on 2016/08/26 15:20:21 UTC
[1/2] arrow git commit: ARROW-264: File format
Repository: arrow
Updated Branches:
refs/heads/master ec51d5667 -> 803afeb50
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
new file mode 100644
index 0000000..9881a22
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.vector.schema.ArrowBuffer;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.FBSerializable;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+import io.netty.buffer.ArrowBuf;
+
+public class ArrowWriter implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);
+
+ private static final byte[] MAGIC = "ARROW1".getBytes();
+
+ private final WritableByteChannel out;
+
+ private final Schema schema;
+
+ private final List<ArrowBlock> recordBatches = new ArrayList<>();
+
+ private long currentPosition = 0;
+
+ private boolean started = false;
+
+ public ArrowWriter(WritableByteChannel out, Schema schema) {
+ this.out = out;
+ this.schema = schema;
+ }
+
+ private void start() throws IOException {
+ writeMagic();
+ }
+
+ private long write(byte[] buffer) throws IOException {
+ return write(ByteBuffer.wrap(buffer));
+ }
+
+ private long writeZeros(int zeroCount) throws IOException {
+ return write(new byte[zeroCount]);
+ }
+
+ private long align() throws IOException {
+ if (currentPosition % 8 != 0) { // align on 8 byte boundaries
+ return writeZeros(8 - (int)(currentPosition % 8));
+ }
+ return 0;
+ }
+
+ private long write(ByteBuffer buffer) throws IOException {
+ long length = buffer.remaining();
+ out.write(buffer);
+ currentPosition += length;
+ return length;
+ }
+
+ private static byte[] intToBytes(int value) {
+ byte[] outBuffer = new byte[4];
+ outBuffer[3] = (byte)(value >>> 24);
+ outBuffer[2] = (byte)(value >>> 16);
+ outBuffer[1] = (byte)(value >>> 8);
+ outBuffer[0] = (byte)(value >>> 0);
+ return outBuffer;
+ }
+
+ private long writeIntLittleEndian(int v) throws IOException {
+ return write(intToBytes(v));
+ }
+
+ // TODO: write dictionaries
+
+ public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
+ checkStarted();
+ align();
+ // write metadata header
+ long offset = currentPosition;
+ write(recordBatch);
+ align();
+ // write body
+ long bodyOffset = currentPosition;
+ List<ArrowBuf> buffers = recordBatch.getBuffers();
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ if (buffers.size() != buffersLayout.size()) {
+ throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
+ }
+ for (int i = 0; i < buffers.size(); i++) {
+ ArrowBuf buffer = buffers.get(i);
+ ArrowBuffer layout = buffersLayout.get(i);
+ long startPosition = bodyOffset + layout.getOffset();
+ if (startPosition != currentPosition) {
+ writeZeros((int)(startPosition - currentPosition));
+ }
+ write(buffer);
+ if (currentPosition != startPosition + layout.getSize()) {
+ throw new IllegalStateException("wrong buffer size: " + currentPosition + " != " + startPosition + layout.getSize());
+ }
+ }
+ int metadataLength = (int)(bodyOffset - offset);
+ if (metadataLength <= 0) {
+ throw new InvalidArrowFileException("invalid recordBatch");
+ }
+ long bodyLength = currentPosition - bodyOffset;
+ LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
+ // add metadata to footer
+ recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
+ }
+
+ private void write(ArrowBuf buffer) throws IOException {
+ write(buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()));
+ }
+
+ private void checkStarted() throws IOException {
+ if (!started) {
+ started = true;
+ start();
+ }
+ }
+
+ public void close() throws IOException {
+ try {
+ long footerStart = currentPosition;
+ writeFooter();
+ int footerLength = (int)(currentPosition - footerStart);
+ if (footerLength <= 0 ) {
+ throw new InvalidArrowFileException("invalid footer");
+ }
+ writeIntLittleEndian(footerLength);
+ LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength));
+ writeMagic();
+ } finally {
+ out.close();
+ }
+ }
+
+ private void writeMagic() throws IOException {
+ write(MAGIC);
+ LOGGER.debug(String.format("magic written, now at %d", currentPosition));
+ }
+
+ private void writeFooter() throws IOException {
+ // TODO: dictionaries
+ write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches));
+ }
+
+ private long write(FBSerializable writer) throws IOException {
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ int root = writer.writeTo(builder);
+ builder.finish(root);
+ return write(builder.dataBuffer());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java b/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
new file mode 100644
index 0000000..3ec75dc
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+public class InvalidArrowFileException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidArrowFileException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
new file mode 100644
index 0000000..3aa3e52
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import org.apache.arrow.flatbuf.Buffer;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class ArrowBuffer implements FBSerializable {
+
+ private int page;
+ private long offset;
+ private long size;
+
+ public ArrowBuffer(int page, long offset, long size) {
+ super();
+ this.page = page;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ public int getPage() {
+ return page;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (offset ^ (offset >>> 32));
+ result = prime * result + page;
+ result = prime * result + (int) (size ^ (size >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ArrowBuffer other = (ArrowBuffer) obj;
+ if (offset != other.offset)
+ return false;
+ if (page != other.page)
+ return false;
+ if (size != other.size)
+ return false;
+ return true;
+ }
+
+ @Override
+ public int writeTo(FlatBufferBuilder builder) {
+ return Buffer.createBuffer(builder, page, offset, size);
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
new file mode 100644
index 0000000..71dd0ab
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import org.apache.arrow.flatbuf.FieldNode;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class ArrowFieldNode implements FBSerializable {
+
+ private final int length;
+ private final int nullCount;
+
+ public ArrowFieldNode(int length, int nullCount) {
+ super();
+ this.length = length;
+ this.nullCount = nullCount;
+ }
+
+ @Override
+ public int writeTo(FlatBufferBuilder builder) {
+ return FieldNode.createFieldNode(builder, length, nullCount);
+ }
+
+ public int getNullCount() {
+ return nullCount;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public String toString() {
+ return "ArrowFieldNode [length=" + length + ", nullCount=" + nullCount + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
new file mode 100644
index 0000000..9162efd
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.RecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+import io.netty.buffer.ArrowBuf;
+
+public class ArrowRecordBatch implements FBSerializable, AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ArrowRecordBatch.class);
+
+ /** number of records */
+ private final int length;
+
+ /** Nodes correspond to the pre-ordered flattened logical schema */
+ private final List<ArrowFieldNode> nodes;
+
+ private final List<ArrowBuf> buffers;
+
+ private final List<ArrowBuffer> buffersLayout;
+
+ private boolean closed = false;
+
+ /**
+ * @param length how many rows in this batch
+ * @param nodes field level info
+ * @param buffers will be retained until this recordBatch is closed
+ */
+ public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
+ super();
+ this.length = length;
+ this.nodes = nodes;
+ this.buffers = buffers;
+ List<ArrowBuffer> arrowBuffers = new ArrayList<>();
+ long offset = 0;
+ for (ArrowBuf arrowBuf : buffers) {
+ arrowBuf.retain();
+ long size = arrowBuf.readableBytes();
+ arrowBuffers.add(new ArrowBuffer(0, offset, size));
+ LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", offset, size));
+ offset += size;
+ if (offset % 8 != 0) { // align on 8 byte boundaries
+ offset += 8 - (offset % 8);
+ }
+ }
+ this.buffersLayout = Collections.unmodifiableList(arrowBuffers);
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ /**
+ * @return the FieldNodes corresponding to the schema
+ */
+ public List<ArrowFieldNode> getNodes() {
+ return nodes;
+ }
+
+ /**
+ * @return the buffers containing the data
+ */
+ public List<ArrowBuf> getBuffers() {
+ if (closed) {
+ throw new IllegalStateException("already closed");
+ }
+ return buffers;
+ }
+
+ /**
+ * @return the serialized layout if we send the buffers on the wire
+ */
+ public List<ArrowBuffer> getBuffersLayout() {
+ return buffersLayout;
+ }
+
+ @Override
+ public int writeTo(FlatBufferBuilder builder) {
+ RecordBatch.startNodesVector(builder, nodes.size());
+ int nodesOffset = writeAllStructsToVector(builder, nodes);
+ RecordBatch.startBuffersVector(builder, buffers.size());
+ int buffersOffset = writeAllStructsToVector(builder, buffersLayout);
+ RecordBatch.startRecordBatch(builder);
+ RecordBatch.addLength(builder, length);
+ RecordBatch.addNodes(builder, nodesOffset);
+ RecordBatch.addBuffers(builder, buffersOffset);
+ return RecordBatch.endRecordBatch(builder);
+ }
+
+ /**
+ * releases the buffers
+ */
+ public void close() {
+ if (!closed) {
+ closed = true;
+ for (ArrowBuf arrowBuf : buffers) {
+ arrowBuf.release();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
new file mode 100644
index 0000000..e3d3e34
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import org.apache.arrow.flatbuf.VectorType;
+
+public class ArrowVectorType {
+
+ public static final ArrowVectorType VALUES = new ArrowVectorType(VectorType.VALUES);
+ public static final ArrowVectorType OFFSET = new ArrowVectorType(VectorType.OFFSET);
+ public static final ArrowVectorType VALIDITY = new ArrowVectorType(VectorType.VALIDITY);
+ public static final ArrowVectorType TYPE = new ArrowVectorType(VectorType.TYPE);
+
+ private final short type;
+
+ public ArrowVectorType(short type) {
+ this.type = type;
+ }
+
+ public short getType() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return VectorType.name(type);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ return "Unlnown type " + type;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
new file mode 100644
index 0000000..d23ed91
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public interface FBSerializable {
+ int writeTo(FlatBufferBuilder builder);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
new file mode 100644
index 0000000..31c17ad
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class FBSerializables {
+
+ public static int writeAllStructsToVector(FlatBufferBuilder builder, List<? extends FBSerializable> all) {
+ // struct vectors have to be created in reverse order
+ List<? extends FBSerializable> reversed = new ArrayList<>(all);
+ Collections.reverse(reversed);
+ for (FBSerializable element : reversed) {
+ element.writeTo(builder);
+ }
+ return builder.endVector();
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
new file mode 100644
index 0000000..1275e0e
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import static java.util.Arrays.asList;
+import static org.apache.arrow.flatbuf.Precision.DOUBLE;
+import static org.apache.arrow.flatbuf.Precision.SINGLE;
+import static org.apache.arrow.vector.schema.VectorLayout.booleanVector;
+import static org.apache.arrow.vector.schema.VectorLayout.byteVector;
+import static org.apache.arrow.vector.schema.VectorLayout.dataVector;
+import static org.apache.arrow.vector.schema.VectorLayout.offsetVector;
+import static org.apache.arrow.vector.schema.VectorLayout.typeVector;
+import static org.apache.arrow.vector.schema.VectorLayout.validityVector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.UnionMode;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeVisitor;
+import org.apache.arrow.vector.types.pojo.ArrowType.Binary;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.Date;
+import org.apache.arrow.vector.types.pojo.ArrowType.Decimal;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.ArrowType.IntervalDay;
+import org.apache.arrow.vector.types.pojo.ArrowType.IntervalYear;
+import org.apache.arrow.vector.types.pojo.ArrowType.Null;
+import org.apache.arrow.vector.types.pojo.ArrowType.Time;
+import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp;
+import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
+import org.apache.arrow.vector.types.pojo.ArrowType.Union;
+import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
+
+/**
+ * The layout of vectors for a given type
+ * It defines its own vectors followed by the vectors for the children
+ * if it is a nested type (Tuple, List, Union)
+ */
+public class TypeLayout {
+
+ public static TypeLayout getTypeLayout(final ArrowType arrowType) {
+ TypeLayout layout = arrowType.accept(new ArrowTypeVisitor<TypeLayout>() {
+
+ @Override public TypeLayout visit(Int type) {
+ return newFixedWidthTypeLayout(dataVector(type.getBitWidth()));
+ }
+
+ @Override public TypeLayout visit(Union type) {
+ List<VectorLayout> vectors;
+ switch (type.getMode()) {
+ case UnionMode.Dense:
+ vectors = asList(
+ // TODO: validate this
+ validityVector(),
+ typeVector(),
+ offsetVector() // offset to find the vector
+ );
+ break;
+ case UnionMode.Sparse:
+ vectors = asList(
+ validityVector(),
+ typeVector()
+ );
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported Union Mode: " + type.getMode());
+ }
+ return new TypeLayout(vectors);
+ }
+
+ @Override public TypeLayout visit(Tuple type) {
+ List<VectorLayout> vectors = asList(
+ // TODO: add validity vector in Map
+// validityVector()
+ );
+ return new TypeLayout(vectors);
+ }
+
+ @Override public TypeLayout visit(Timestamp type) {
+ return newFixedWidthTypeLayout(dataVector(64));
+ }
+
+ @Override public TypeLayout visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) {
+ List<VectorLayout> vectors = asList(
+ validityVector(),
+ offsetVector()
+ );
+ return new TypeLayout(vectors);
+ }
+
+ @Override public TypeLayout visit(FloatingPoint type) {
+ int bitWidth;
+ switch (type.getPrecision()) {
+ case SINGLE:
+ bitWidth = 32;
+ break;
+ case DOUBLE:
+ bitWidth = 64;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported Precision: " + type.getPrecision());
+ }
+ return newFixedWidthTypeLayout(dataVector(bitWidth));
+ }
+
+ @Override public TypeLayout visit(Decimal type) {
+ // TODO: check size
+ return newFixedWidthTypeLayout(dataVector(64)); // actually depends on the type fields
+ }
+
+ @Override public TypeLayout visit(Bool type) {
+ return newFixedWidthTypeLayout(booleanVector());
+ }
+
+ @Override public TypeLayout visit(Binary type) {
+ return newVariableWidthTypeLayout();
+ }
+
+ @Override public TypeLayout visit(Utf8 type) {
+ return newVariableWidthTypeLayout();
+ }
+
+ private TypeLayout newVariableWidthTypeLayout() {
+ return newPrimitiveTypeLayout(validityVector(), offsetVector(), byteVector());
+ }
+
+ private TypeLayout newPrimitiveTypeLayout(VectorLayout... vectors) {
+ return new TypeLayout(asList(vectors));
+ }
+
+ public TypeLayout newFixedWidthTypeLayout(VectorLayout dataVector) {
+ return newPrimitiveTypeLayout(validityVector(), dataVector);
+ }
+
+ @Override
+ public TypeLayout visit(Null type) {
+ return new TypeLayout(Collections.<VectorLayout>emptyList());
+ }
+
+ @Override
+ public TypeLayout visit(Date type) {
+ return newFixedWidthTypeLayout(dataVector(64));
+ }
+
+ @Override
+ public TypeLayout visit(Time type) {
+ return newFixedWidthTypeLayout(dataVector(64));
+ }
+
+ @Override
+ public TypeLayout visit(IntervalDay type) { // TODO: check size
+ return newFixedWidthTypeLayout(dataVector(64));
+ }
+
+ @Override
+ public TypeLayout visit(IntervalYear type) { // TODO: check size
+ return newFixedWidthTypeLayout(dataVector(64));
+ }
+ });
+ return layout;
+ }
+
+ private final List<VectorLayout> vectors;
+
+ public TypeLayout(List<VectorLayout> vectors) {
+ super();
+ this.vectors = vectors;
+ }
+
+ public TypeLayout(VectorLayout... vectors) {
+ this(asList(vectors));
+ }
+
+
+ public List<VectorLayout> getVectors() {
+ return vectors;
+ }
+
+ public List<ArrowVectorType> getVectorTypes() {
+ List<ArrowVectorType> types = new ArrayList<>(vectors.size());
+ for (VectorLayout vector : vectors) {
+ types.add(vector.getType());
+ }
+ return types;
+ }
+
+ public String toString() {
+ return "TypeLayout{" + vectors + "}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
new file mode 100644
index 0000000..421ebcb
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.schema;
+
+import static org.apache.arrow.vector.schema.ArrowVectorType.OFFSET;
+import static org.apache.arrow.vector.schema.ArrowVectorType.TYPE;
+import static org.apache.arrow.vector.schema.ArrowVectorType.VALIDITY;
+import static org.apache.arrow.vector.schema.ArrowVectorType.VALUES;
+
+public class VectorLayout {
+
+ private static final VectorLayout VALIDITY_VECTOR = new VectorLayout(VALIDITY, 1);
+ private static final VectorLayout OFFSET_VECTOR = new VectorLayout(OFFSET, 32);
+ private static final VectorLayout TYPE_VECTOR = new VectorLayout(TYPE, 32);
+ private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(VALUES, 1);
+ private static final VectorLayout VALUES_64 = new VectorLayout(VALUES, 64);
+ private static final VectorLayout VALUES_32 = new VectorLayout(VALUES, 32);
+ private static final VectorLayout VALUES_16 = new VectorLayout(VALUES, 16);
+ private static final VectorLayout VALUES_8 = new VectorLayout(VALUES, 8);
+
+ public static VectorLayout typeVector() {
+ return TYPE_VECTOR;
+ }
+
+ public static VectorLayout offsetVector() {
+ return OFFSET_VECTOR;
+ }
+
+ public static VectorLayout dataVector(int typeBitWidth) {
+ switch (typeBitWidth) {
+ case 8:
+ return VALUES_8;
+ case 16:
+ return VALUES_16;
+ case 32:
+ return VALUES_32;
+ case 64:
+ return VALUES_64;
+ default:
+ throw new IllegalArgumentException("only 8, 16, 32, or 64 bits supported");
+ }
+ }
+
+ public static VectorLayout booleanVector() {
+ return BOOLEAN_VECTOR;
+ }
+
+ public static VectorLayout validityVector() {
+ return VALIDITY_VECTOR;
+ }
+
+ public static VectorLayout byteVector() {
+ return dataVector(8);
+ }
+
+ private final int typeBitWidth;
+
+ private final ArrowVectorType type;
+
+ private VectorLayout(ArrowVectorType type, int typeBitWidth) {
+ super();
+ this.type = type;
+ this.typeBitWidth = typeBitWidth;
+ }
+
+ public int getTypeBitWidth() {
+ return typeBitWidth;
+ }
+
+ public ArrowVectorType getType() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{width=%s,type=%s}", typeBitWidth, type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
index c34882a..4d0d9ee 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java
@@ -17,8 +17,14 @@
*/
package org.apache.arrow.vector.types;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.flatbuf.Precision;
import org.apache.arrow.flatbuf.Type;
+import org.apache.arrow.flatbuf.UnionMode;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullableBigIntVector;
import org.apache.arrow.vector.NullableBitVector;
import org.apache.arrow.vector.NullableDateVector;
@@ -38,7 +44,6 @@ import org.apache.arrow.vector.NullableUInt4Vector;
import org.apache.arrow.vector.NullableUInt8Vector;
import org.apache.arrow.vector.NullableVarBinaryVector;
import org.apache.arrow.vector.NullableVarCharVector;
-import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.complex.ListVector;
@@ -85,9 +90,6 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
-import java.util.HashMap;
-import java.util.Map;
-
public class Types {
public static final Field NULL_FIELD = new Field("", true, Null.INSTANCE, null);
@@ -104,8 +106,8 @@ public class Types {
public static final Field TIMESTAMP_FIELD = new Field("", true, new Timestamp(""), null);
public static final Field INTERVALDAY_FIELD = new Field("", true, IntervalDay.INSTANCE, null);
public static final Field INTERVALYEAR_FIELD = new Field("", true, IntervalYear.INSTANCE, null);
- public static final Field FLOAT4_FIELD = new Field("", true, new FloatingPoint(0), null);
- public static final Field FLOAT8_FIELD = new Field("", true, new FloatingPoint(1), null);
+ public static final Field FLOAT4_FIELD = new Field("", true, new FloatingPoint(Precision.SINGLE), null);
+ public static final Field FLOAT8_FIELD = new Field("", true, new FloatingPoint(Precision.DOUBLE), null);
public static final Field LIST_FIELD = new Field("", true, List.INSTANCE, null);
public static final Field VARCHAR_FIELD = new Field("", true, Utf8.INSTANCE, null);
public static final Field VARBINARY_FIELD = new Field("", true, Binary.INSTANCE, null);
@@ -120,7 +122,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return ZeroVector.INSTANCE;
}
@@ -136,7 +138,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new MapVector(name, allocator, callBack);
}
@@ -153,7 +155,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableTinyIntVector(name, allocator);
}
@@ -169,8 +171,8 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
- return new SmallIntVector(name, allocator);
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ return new NullableSmallIntVector(name, allocator);
}
@Override
@@ -185,7 +187,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableIntVector(name, allocator);
}
@@ -201,7 +203,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableBigIntVector(name, allocator);
}
@@ -217,7 +219,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableDateVector(name, allocator);
}
@@ -233,7 +235,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableTimeVector(name, allocator);
}
@@ -249,7 +251,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableTimeStampVector(name, allocator);
}
@@ -265,7 +267,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableIntervalDayVector(name, allocator);
}
@@ -281,7 +283,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableIntervalDayVector(name, allocator);
}
@@ -290,14 +292,14 @@ public class Types {
return new IntervalYearWriterImpl((NullableIntervalYearVector) vector);
}
},
- FLOAT4(new FloatingPoint(0)) {
+ FLOAT4(new FloatingPoint(Precision.SINGLE)) {
@Override
public Field getField() {
return FLOAT4_FIELD;
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableFloat4Vector(name, allocator);
}
@@ -306,14 +308,14 @@ public class Types {
return new Float4WriterImpl((NullableFloat4Vector) vector);
}
}, // 4 byte ieee 754
- FLOAT8(new FloatingPoint(1)) {
+ FLOAT8(new FloatingPoint(Precision.DOUBLE)) {
@Override
public Field getField() {
return FLOAT8_FIELD;
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableFloat8Vector(name, allocator);
}
@@ -329,7 +331,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableBitVector(name, allocator);
}
@@ -345,7 +347,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableVarCharVector(name, allocator);
}
@@ -361,7 +363,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableVarBinaryVector(name, allocator);
}
@@ -381,7 +383,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableDecimalVector(name, allocator, precisionScale[0], precisionScale[1]);
}
@@ -397,7 +399,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableUInt1Vector(name, allocator);
}
@@ -413,7 +415,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableUInt2Vector(name, allocator);
}
@@ -429,7 +431,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableUInt4Vector(name, allocator);
}
@@ -445,7 +447,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new NullableUInt8Vector(name, allocator);
}
@@ -461,7 +463,7 @@ public class Types {
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new ListVector(name, allocator, callBack);
}
@@ -470,14 +472,14 @@ public class Types {
return new UnionListWriter((ListVector) vector);
}
},
- UNION(Union.INSTANCE) {
+ UNION(new Union(UnionMode.Sparse)) {
@Override
public Field getField() {
throw new UnsupportedOperationException("Cannot get simple field for Union type");
}
@Override
- public ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
+ public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) {
return new UnionVector(name, allocator, callBack);
}
@@ -499,7 +501,7 @@ public class Types {
public abstract Field getField();
- public abstract ValueVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale);
+ public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale);
public abstract FieldWriter getNewFieldWriter(ValueVector vector);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
index 49d0503..36712b9 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
@@ -18,19 +18,24 @@
package org.apache.arrow.vector.types.pojo;
-import com.google.common.collect.ImmutableList;
-import com.google.flatbuffers.FlatBufferBuilder;
+import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField;
+import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.schema.TypeLayout;
+
+import com.google.common.collect.ImmutableList;
+import com.google.flatbuffers.FlatBufferBuilder;
public class Field {
private final String name;
private final boolean nullable;
private final ArrowType type;
private final List<Field> children;
+ private final TypeLayout typeLayout;
public Field(String name, boolean nullable, ArrowType type, List<Field> children) {
this.name = name;
@@ -41,18 +46,32 @@ public class Field {
} else {
this.children = children;
}
+ this.typeLayout = TypeLayout.getTypeLayout(type);
}
public static Field convertField(org.apache.arrow.flatbuf.Field field) {
String name = field.name();
boolean nullable = field.nullable();
ArrowType type = getTypeForField(field);
+ List<ArrowVectorType> buffers = new ArrayList<>();
+ for (int i = 0; i < field.buffersLength(); ++i) {
+ buffers.add(new ArrowVectorType(field.buffers(i)));
+ }
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
for (int i = 0; i < field.childrenLength(); i++) {
childrenBuilder.add(convertField(field.children(i)));
}
List<Field> children = childrenBuilder.build();
- return new Field(name, nullable, type, children);
+ Field result = new Field(name, nullable, type, children);
+ TypeLayout typeLayout = result.getTypeLayout();
+ if (typeLayout.getVectors().size() != field.buffersLength()) {
+ List<ArrowVectorType> types = new ArrayList<>();
+ for (int i = 0; i < field.buffersLength(); i++) {
+ types.add(new ArrowVectorType(field.buffers(i)));
+ }
+ throw new IllegalArgumentException("Deserialized field does not match expected vectors. expected: " + typeLayout.getVectorTypes() + " got " + types);
+ }
+ return result;
}
public int getField(FlatBufferBuilder builder) {
@@ -63,12 +82,18 @@ public class Field {
childrenData[i] = children.get(i).getField(builder);
}
int childrenOffset = org.apache.arrow.flatbuf.Field.createChildrenVector(builder, childrenData);
+ short[] buffersData = new short[typeLayout.getVectors().size()];
+ for (int i = 0; i < buffersData.length; i++) {
+ buffersData[i] = typeLayout.getVectors().get(i).getType().getType();
+ }
+ int buffersOffset = org.apache.arrow.flatbuf.Field.createBuffersVector(builder, buffersData );
org.apache.arrow.flatbuf.Field.startField(builder);
org.apache.arrow.flatbuf.Field.addName(builder, nameOffset);
org.apache.arrow.flatbuf.Field.addNullable(builder, nullable);
org.apache.arrow.flatbuf.Field.addTypeType(builder, type.getTypeType());
org.apache.arrow.flatbuf.Field.addType(builder, typeOffset);
org.apache.arrow.flatbuf.Field.addChildren(builder, childrenOffset);
+ org.apache.arrow.flatbuf.Field.addBuffers(builder, buffersOffset);
return org.apache.arrow.flatbuf.Field.endField(builder);
}
@@ -88,6 +113,10 @@ public class Field {
return children;
}
+ public TypeLayout getTypeLayout() {
+ return typeLayout;
+ }
+
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Field)) {
@@ -102,4 +131,9 @@ public class Field {
(this.children.size() == 0 && that.children == null));
}
+
+ @Override
+ public String toString() {
+ return String.format("Field{name=%s, type=%s, children=%s, layout=%s}", name, type, children, typeLayout);
+ }
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
index 9e28941..231be9b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
@@ -18,15 +18,13 @@
package org.apache.arrow.vector.types.pojo;
-import com.google.common.collect.ImmutableList;
-import com.google.flatbuffers.FlatBufferBuilder;
+import static org.apache.arrow.vector.types.pojo.Field.convertField;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
-import static org.apache.arrow.vector.types.pojo.ArrowType.getTypeForField;
-import static org.apache.arrow.vector.types.pojo.Field.convertField;
+import com.google.common.collect.ImmutableList;
+import com.google.flatbuffers.FlatBufferBuilder;
public class Schema {
private List<Field> fields;
@@ -71,4 +69,9 @@ public class Schema {
List<Field> fields = childrenBuilder.build();
return new Schema(fields);
}
+
+ @Override
+ public String toString() {
+ return "Schema" + fields;
+ }
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
new file mode 100644
index 0000000..85bb2cf
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
+import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.arrow.vector.complex.reader.BaseReader.MapReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestVectorUnloadLoad {
+
+ static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
+
+ @Test
+ public void test() throws IOException {
+ int count = 10000;
+ Schema schema;
+
+ try (
+ BufferAllocator originalVectorsAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", originalVectorsAllocator, null)) {
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ MapWriter rootWriter = writer.rootAsMap();
+ IntWriter intWriter = rootWriter.integer("int");
+ BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
+ for (int i = 0; i < count; i++) {
+ intWriter.setPosition(i);
+ intWriter.writeInt(i);
+ bigIntWriter.setPosition(i);
+ bigIntWriter.writeBigInt(i);
+ }
+ writer.setValueCount(count);
+
+ VectorUnloader vectorUnloader = new VectorUnloader((MapVector)parent.getChild("root"));
+ schema = vectorUnloader.getSchema();
+
+ try (
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector newParent = new MapVector("parent", finalVectorsAllocator, null)) {
+ MapVector root = newParent.addOrGet("root", MinorType.MAP, MapVector.class);
+ VectorLoader vectorLoader = new VectorLoader(schema, root);
+
+ vectorLoader.load(recordBatch);
+
+ MapReader rootReader = new SingleMapReaderImpl(newParent).reader("root");
+ for (int i = 0; i < count; i++) {
+ rootReader.setPosition(i);
+ Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
+ Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ allocator.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java b/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java
new file mode 100644
index 0000000..7c423d5
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/ByteArrayReadableSeekableByteChannel.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+
+public class ByteArrayReadableSeekableByteChannel implements SeekableByteChannel {
+ private byte[] byteArray;
+ private int position = 0;
+
+ public ByteArrayReadableSeekableByteChannel(byte[] byteArray) {
+ if (byteArray == null) {
+ throw new NullPointerException();
+ }
+ this.byteArray = byteArray;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return byteArray != null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ byteArray = null;
+ }
+
+ @Override
+ public int read(final ByteBuffer dst) throws IOException {
+ int remainingInBuf = byteArray.length - this.position;
+ int length = Math.min(dst.remaining(), remainingInBuf);
+ dst.put(this.byteArray, this.position, length);
+ this.position += length;
+ return length;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return this.position;
+ }
+
+ @Override
+ public SeekableByteChannel position(final long newPosition) throws IOException {
+ this.position = (int)newPosition;
+ return this;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return this.byteArray.length;
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException("Read only");
+ }
+
+ @Override
+ public SeekableByteChannel truncate(final long size) throws IOException {
+ throw new UnsupportedOperationException("Read only");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
new file mode 100644
index 0000000..11de0a2
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ValueVector.Accessor;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
+import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.arrow.vector.complex.reader.BaseReader.MapReader;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+import org.apache.arrow.vector.schema.ArrowBuffer;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestArrowFile {
+ private static final int COUNT = 10;
+ private BufferAllocator allocator;
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ }
+
+ @After
+ public void tearDown() {
+ allocator.close();
+ }
+
+ @Test
+ public void testWrite() throws IOException {
+ File file = new File("target/mytest_write.arrow");
+ int count = COUNT;
+ try (
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)) {
+ writeData(count, parent);
+ write((MapVector)parent.getChild("root"), file);
+ }
+ }
+
+ @Test
+ public void testWriteComplex() throws IOException {
+ File file = new File("target/mytest_write_complex.arrow");
+ int count = COUNT;
+ try (
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)) {
+ writeComplexData(count, parent);
+ validateComplexContent(count, parent);
+ write((MapVector)parent.getChild("root"), file);
+ }
+ }
+
+ private void writeComplexData(int count, MapVector parent) {
+ ArrowBuf varchar = allocator.buffer(3);
+ varchar.readerIndex(0);
+ varchar.setByte(0, 'a');
+ varchar.setByte(1, 'b');
+ varchar.setByte(2, 'c');
+ varchar.writerIndex(3);
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ MapWriter rootWriter = writer.rootAsMap();
+ IntWriter intWriter = rootWriter.integer("int");
+ BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
+ ListWriter listWriter = rootWriter.list("list");
+ MapWriter mapWriter = rootWriter.map("map");
+ for (int i = 0; i < count; i++) {
+ intWriter.setPosition(i);
+ intWriter.writeInt(i);
+ bigIntWriter.setPosition(i);
+ bigIntWriter.writeBigInt(i);
+ listWriter.setPosition(i);
+ listWriter.startList();
+ for (int j = 0; j < i % 3; j++) {
+ listWriter.varChar().writeVarChar(0, 3, varchar);
+ }
+ listWriter.endList();
+ mapWriter.setPosition(i);
+ mapWriter.start();
+ mapWriter.timeStamp("timestamp").writeTimeStamp(i);
+ mapWriter.end();
+ }
+ writer.setValueCount(count);
+ varchar.release();
+ }
+
+
+ private void writeData(int count, MapVector parent) {
+ ComplexWriter writer = new ComplexWriterImpl("root", parent);
+ MapWriter rootWriter = writer.rootAsMap();
+ IntWriter intWriter = rootWriter.integer("int");
+ BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
+ for (int i = 0; i < count; i++) {
+ intWriter.setPosition(i);
+ intWriter.writeInt(i);
+ bigIntWriter.setPosition(i);
+ bigIntWriter.writeBigInt(i);
+ }
+ writer.setValueCount(count);
+ }
+
+ @Test
+ public void testWriteRead() throws IOException {
+ File file = new File("target/mytest.arrow");
+ int count = COUNT;
+
+ // write
+ try (
+ BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
+ writeData(count, parent);
+ write((MapVector)parent.getChild("root"), file);
+ }
+
+ // read
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(file);
+ ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)
+ ) {
+ ArrowFooter footer = arrowReader.readFooter();
+ Schema schema = footer.getSchema();
+ System.out.println("reading schema: " + schema);
+
+ // initialize vectors
+
+ MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class);
+
+ VectorLoader vectorLoader = new VectorLoader(schema, root);
+
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ Assert.assertEquals(0, rbBlock.getOffset() % 8);
+ Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ for (ArrowBuffer arrowBuffer : buffersLayout) {
+ Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ }
+ vectorLoader.load(recordBatch);
+ }
+
+ validateContent(count, parent);
+ }
+ }
+ }
+
+ private void validateContent(int count, MapVector parent) {
+ MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
+ for (int i = 0; i < count; i++) {
+ rootReader.setPosition(i);
+ Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
+ Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
+ }
+ }
+
+ @Test
+ public void testWriteReadComplex() throws IOException {
+ File file = new File("target/mytest_complex.arrow");
+ int count = COUNT;
+
+ // write
+ try (
+ BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
+ writeComplexData(count, parent);
+ write((MapVector)parent.getChild("root"), file);
+ }
+
+ // read
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(file);
+ ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)
+ ) {
+ ArrowFooter footer = arrowReader.readFooter();
+ Schema schema = footer.getSchema();
+ System.out.println("reading schema: " + schema);
+
+ // initialize vectors
+
+ MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class);
+
+ VectorLoader vectorLoader = new VectorLoader(schema, root);
+
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ for (ArrowBlock rbBlock : recordBatches) {
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ vectorLoader.load(recordBatch);
+ }
+ validateComplexContent(count, parent);
+ }
+ }
+ }
+
+ public void printVectors(List<FieldVector> vectors) {
+ for (FieldVector vector : vectors) {
+ System.out.println(vector.getField().getName());
+ Accessor accessor = vector.getAccessor();
+ int valueCount = accessor.getValueCount();
+ for (int i = 0; i < valueCount; i++) {
+ System.out.println(accessor.getObject(i));
+ }
+ }
+ }
+
+ private void validateComplexContent(int count, MapVector parent) {
+ printVectors(parent.getChildrenFromFields());
+
+ MapReader rootReader = new SingleMapReaderImpl(parent).reader("root");
+ for (int i = 0; i < count; i++) {
+ rootReader.setPosition(i);
+ Assert.assertEquals(i, rootReader.reader("int").readInteger().intValue());
+ Assert.assertEquals(i, rootReader.reader("bigInt").readLong().longValue());
+ Assert.assertEquals(i % 3, rootReader.reader("list").size());
+ Assert.assertEquals(i, rootReader.reader("map").reader("timestamp").readDateTime().getMillis() % COUNT);
+ }
+ }
+
+ private void write(MapVector parent, File file) throws FileNotFoundException, IOException {
+ VectorUnloader vectorUnloader = new VectorUnloader(parent);
+ Schema schema = vectorUnloader.getSchema();
+ System.out.println("writing schema: " + schema);
+ try (
+ FileOutputStream fileOutputStream = new FileOutputStream(file);
+ ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ ) {
+ arrowWriter.writeRecordBatch(recordBatch);
+ }
+ }
+
+ @Test
+ public void testWriteReadMultipleRBs() throws IOException {
+ File file = new File("target/mytest_multiple.arrow");
+ int count = COUNT;
+
+ // write
+ try (
+ BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", originalVectorAllocator, null);
+ FileOutputStream fileOutputStream = new FileOutputStream(file);) {
+ writeData(count, parent);
+ VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root"));
+ Schema schema = vectorUnloader.getSchema();
+ Assert.assertEquals(2, schema.getFields().size());
+ try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) {
+ try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) {
+ arrowWriter.writeRecordBatch(recordBatch);
+ }
+ parent.allocateNew();
+ writeData(count, parent);
+ try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) {
+ arrowWriter.writeRecordBatch(recordBatch);
+ }
+ }
+ }
+
+ // read
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ FileInputStream fileInputStream = new FileInputStream(file);
+ ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null);
+ ) {
+ ArrowFooter footer = arrowReader.readFooter();
+ Schema schema = footer.getSchema();
+ System.out.println("reading schema: " + schema);
+ MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class);
+ VectorLoader vectorLoader = new VectorLoader(schema, root);
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ Assert.assertEquals(2, recordBatches.size());
+ for (ArrowBlock rbBlock : recordBatches) {
+ Assert.assertEquals(0, rbBlock.getOffset() % 8);
+ Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
+ try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ for (ArrowBuffer arrowBuffer : buffersLayout) {
+ Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ }
+ vectorLoader.load(recordBatch);
+ validateContent(count, parent);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
new file mode 100644
index 0000000..707dba2
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class TestArrowFooter {
+
+ @Test
+ public void test() {
+ Schema schema = new Schema(asList(
+ new Field("a", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())
+ ));
+ ArrowFooter footer = new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), Collections.<ArrowBlock>emptyList());
+ ArrowFooter newFooter = roundTrip(footer);
+ assertEquals(footer, newFooter);
+ }
+
+
+ private ArrowFooter roundTrip(ArrowFooter footer) {
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ int i = footer.writeTo(builder);
+ builder.finish(i);
+ ByteBuffer dataBuffer = builder.dataBuffer();
+ ArrowFooter newFooter = new ArrowFooter(Footer.getRootAsFooter(dataBuffer));
+ return newFooter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
new file mode 100644
index 0000000..f90329a
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.ArrowBlock;
+import org.apache.arrow.vector.file.ArrowFooter;
+import org.apache.arrow.vector.file.ArrowReader;
+import org.apache.arrow.vector.file.ArrowWriter;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestArrowReaderWriter {
+
+ private BufferAllocator allocator;
+
+ @Before
+ public void init() {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
+
+ ArrowBuf buf(byte[] bytes) {
+ ArrowBuf buffer = allocator.buffer(bytes.length);
+ buffer.writeBytes(bytes);
+ return buffer;
+ }
+
+ byte[] array(ArrowBuf buf) {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ return bytes;
+ }
+
+ @Test
+ public void test() throws IOException {
+ Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
+ byte[] validity = new byte[] { (byte)255, 0};
+ // second half is "undefined"
+ byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) {
+ ArrowBuf validityb = buf(validity);
+ ArrowBuf valuesb = buf(values);
+ writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
+ }
+
+ byte[] byteArray = out.toByteArray();
+
+ try (ArrowReader reader = new ArrowReader(new ByteArrayReadableSeekableByteChannel(byteArray), allocator)) {
+ ArrowFooter footer = reader.readFooter();
+ Schema readSchema = footer.getSchema();
+ assertEquals(schema, readSchema);
+ assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);
+ // TODO: dictionaries
+ List<ArrowBlock> recordBatches = footer.getRecordBatches();
+ assertEquals(1, recordBatches.size());
+ ArrowRecordBatch recordBatch = reader.readRecordBatch(recordBatches.get(0));
+ List<ArrowFieldNode> nodes = recordBatch.getNodes();
+ assertEquals(1, nodes.size());
+ ArrowFieldNode node = nodes.get(0);
+ assertEquals(16, node.getLength());
+ assertEquals(8, node.getNullCount());
+ List<ArrowBuf> buffers = recordBatch.getBuffers();
+ assertEquals(2, buffers.size());
+ assertArrayEquals(validity, array(buffers.get(0)));
+ assertArrayEquals(values, array(buffers.get(1)));
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java b/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java
index 06a1149..61327f1 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java
@@ -17,19 +17,24 @@
*/
package org.apache.arrow.vector.pojo;
-import com.google.common.collect.ImmutableList;
-import com.google.flatbuffers.FlatBufferBuilder;
+import static org.apache.arrow.flatbuf.Precision.DOUBLE;
+import static org.apache.arrow.flatbuf.Precision.SINGLE;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.arrow.flatbuf.UnionMode;
import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.ArrowType.List;
+import org.apache.arrow.vector.types.pojo.ArrowType.Timestamp;
import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
+import org.apache.arrow.vector.types.pojo.ArrowType.Union;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Test;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.ImmutableList;
+import com.google.flatbuffers.FlatBufferBuilder;
/**
* Test conversion between Flatbuf and Pojo field representations
@@ -46,7 +51,7 @@ public class TestConvert {
public void complex() {
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(new Field("child1", true, Utf8.INSTANCE, null));
- childrenBuilder.add(new Field("child2", true, new FloatingPoint(0), ImmutableList.<Field>of()));
+ childrenBuilder.add(new Field("child2", true, new FloatingPoint(SINGLE), ImmutableList.<Field>of()));
Field initialField = new Field("a", true, Tuple.INSTANCE, childrenBuilder.build());
run(initialField);
@@ -56,10 +61,29 @@ public class TestConvert {
public void schema() {
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(new Field("child1", true, Utf8.INSTANCE, null));
- childrenBuilder.add(new Field("child2", true, new FloatingPoint(0), ImmutableList.<Field>of()));
+ childrenBuilder.add(new Field("child2", true, new FloatingPoint(SINGLE), ImmutableList.<Field>of()));
Schema initialSchema = new Schema(childrenBuilder.build());
run(initialSchema);
+ }
+ @Test
+ public void nestedSchema() {
+ ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ childrenBuilder.add(new Field("child1", true, Utf8.INSTANCE, null));
+ childrenBuilder.add(new Field("child2", true, new FloatingPoint(SINGLE), ImmutableList.<Field>of()));
+ childrenBuilder.add(new Field("child3", true, new Tuple(), ImmutableList.<Field>of(
+ new Field("child3.1", true, Utf8.INSTANCE, null),
+ new Field("child3.2", true, new FloatingPoint(DOUBLE), ImmutableList.<Field>of())
+ )));
+ childrenBuilder.add(new Field("child4", true, new List(), ImmutableList.<Field>of(
+ new Field("child4.1", true, Utf8.INSTANCE, null)
+ )));
+ childrenBuilder.add(new Field("child5", true, new Union(UnionMode.Sparse), ImmutableList.<Field>of(
+ new Field("child5.1", true, new Timestamp("UTC"), null),
+ new Field("child5.2", true, new FloatingPoint(DOUBLE), ImmutableList.<Field>of())
+ )));
+ Schema initialSchema = new Schema(childrenBuilder.build());
+ run(initialSchema);
}
private void run(Field initialField) {
[2/2] arrow git commit: ARROW-264: File format
Posted by ju...@apache.org.
ARROW-264: File format
This is work in progress
Author: Julien Le Dem <ju...@dremio.com>
Closes #123 from julienledem/arrow_264_file_format and squashes the following commits:
252de6d [Julien Le Dem] remove outdated comment
04d797f [Julien Le Dem] maps are not nullable yet
e8359b3 [Julien Le Dem] align on 8 byte boundaries; more tests
8b8b823 [Julien Le Dem] refactoring
31e95e6 [Julien Le Dem] fix list vector
b824938 [Julien Le Dem] fix types; add licenses; more tests; more complex
2fd3bc1 [Julien Le Dem] cleanup
50fe680 [Julien Le Dem] nested support
b0bf6bc [Julien Le Dem] cleanup
4247b1a [Julien Le Dem] fix whitespace
d6a1788 [Julien Le Dem] refactoring
81863c5 [Julien Le Dem] fixed loader
aa1b766 [Julien Le Dem] better test
2067e01 [Julien Le Dem] update format
aacf61e [Julien Le Dem] fix pom
b907aa5 [Julien Le Dem] simplify
e43f26b [Julien Le Dem] add layout spec
0cc9718 [Julien Le Dem] add vector type
ac6902a [Julien Le Dem] ARROW-264: File format
807db51 [Julien Le Dem] move information to schema
f2f0596 [Julien Le Dem] Update FieldNode structure to be more explicit and reflect schema
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/803afeb5
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/803afeb5
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/803afeb5
Branch: refs/heads/master
Commit: 803afeb502dcdd802fada2ed0d66c145546b8a78
Parents: ec51d56
Author: Julien Le Dem <ju...@dremio.com>
Authored: Fri Aug 26 08:20:13 2016 -0700
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Fri Aug 26 08:20:13 2016 -0700
----------------------------------------------------------------------
cpp/src/arrow/ipc/metadata-internal.cc | 1 +
format/File.fbs | 28 ++
format/Message.fbs | 21 +-
java/format/pom.xml | 1 +
.../src/main/java/io/netty/buffer/ArrowBuf.java | 71 ++--
.../vector/src/main/codegen/data/ArrowTypes.tdd | 4 +-
.../src/main/codegen/templates/ArrowType.java | 29 +-
.../codegen/templates/NullableValueVectors.java | 49 ++-
.../src/main/codegen/templates/UnionVector.java | 40 ++-
.../arrow/vector/BaseDataValueVector.java | 38 ++-
.../org/apache/arrow/vector/BufferBacked.java | 31 ++
.../org/apache/arrow/vector/FieldVector.java | 65 ++++
.../org/apache/arrow/vector/ValueVector.java | 6 +-
.../org/apache/arrow/vector/VectorLoader.java | 99 ++++++
.../org/apache/arrow/vector/VectorUnloader.java | 78 +++++
.../org/apache/arrow/vector/ZeroVector.java | 39 ++-
.../vector/complex/AbstractContainerVector.java | 21 +-
.../arrow/vector/complex/AbstractMapVector.java | 42 ++-
.../vector/complex/BaseRepeatedValueVector.java | 21 +-
.../apache/arrow/vector/complex/ListVector.java | 58 +++-
.../apache/arrow/vector/complex/MapVector.java | 59 +++-
.../vector/complex/impl/ComplexWriterImpl.java | 2 +-
.../vector/complex/impl/PromotableWriter.java | 3 +-
.../apache/arrow/vector/file/ArrowBlock.java | 82 +++++
.../apache/arrow/vector/file/ArrowFooter.java | 144 ++++++++
.../apache/arrow/vector/file/ArrowReader.java | 151 +++++++++
.../apache/arrow/vector/file/ArrowWriter.java | 179 ++++++++++
.../vector/file/InvalidArrowFileException.java | 27 ++
.../apache/arrow/vector/schema/ArrowBuffer.java | 81 +++++
.../arrow/vector/schema/ArrowFieldNode.java | 53 +++
.../arrow/vector/schema/ArrowRecordBatch.java | 127 +++++++
.../arrow/vector/schema/ArrowVectorType.java | 47 +++
.../arrow/vector/schema/FBSerializable.java | 24 ++
.../arrow/vector/schema/FBSerializables.java | 37 +++
.../apache/arrow/vector/schema/TypeLayout.java | 208 ++++++++++++
.../arrow/vector/schema/VectorLayout.java | 93 ++++++
.../org/apache/arrow/vector/types/Types.java | 70 ++--
.../apache/arrow/vector/types/pojo/Field.java | 42 ++-
.../apache/arrow/vector/types/pojo/Schema.java | 13 +-
.../arrow/vector/TestVectorUnloadLoad.java | 89 +++++
.../ByteArrayReadableSeekableByteChannel.java | 80 +++++
.../apache/arrow/vector/file/TestArrowFile.java | 331 +++++++++++++++++++
.../arrow/vector/file/TestArrowFooter.java | 56 ++++
.../vector/file/TestArrowReaderWriter.java | 106 ++++++
.../apache/arrow/vector/pojo/TestConvert.java | 38 ++-
45 files changed, 2722 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 50db730..c921e4d 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -219,6 +219,7 @@ static Status FieldToFlatbuffer(
RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
auto fb_children = fbb.CreateVector(children);
+ // TODO: produce the list of VectorTypes
*offset = flatbuf::CreateField(
fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary,
fb_children);
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/format/File.fbs
----------------------------------------------------------------------
diff --git a/format/File.fbs b/format/File.fbs
new file mode 100644
index 0000000..f7ad1e1
--- /dev/null
+++ b/format/File.fbs
@@ -0,0 +1,28 @@
+include "Message.fbs";
+
+namespace org.apache.arrow.flatbuf;
+
+/// ----------------------------------------------------------------------
+/// Arrow File metadata
+///
+
+table Footer {
+
+ schema: org.apache.arrow.flatbuf.Schema;
+
+ dictionaries: [ Block ];
+
+ recordBatches: [ Block ];
+}
+
+struct Block {
+
+ offset: long;
+
+ metaDataLength: int;
+
+ bodyLength: long;
+
+}
+
+root_type Footer;
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/format/Message.fbs
----------------------------------------------------------------------
diff --git a/format/Message.fbs b/format/Message.fbs
index a78009b..b02f3fa 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -17,7 +17,7 @@ table Tuple {
table List {
}
-enum UnionMode:int { Sparse, Dense }
+enum UnionMode:short { Sparse, Dense }
table Union {
mode: UnionMode;
@@ -28,7 +28,7 @@ table Int {
is_signed: bool;
}
-enum Precision:int {SINGLE, DOUBLE}
+enum Precision:short {SINGLE, DOUBLE}
table FloatingPoint {
precision: Precision;
@@ -91,6 +91,17 @@ union Type {
JSONScalar
}
+enum VectorType: short {
+ /// used in List type Dense Union and variable length primitive types (String, Binary)
+ OFFSET,
+ /// fixed length primitive values
+ VALUES,
+ /// Bit vector indicated if each value is null
+ VALIDITY,
+ /// Type vector used in Union type
+ TYPE
+}
+
/// ----------------------------------------------------------------------
/// A field represents a named column in a record / row batch or child of a
/// nested type.
@@ -109,12 +120,16 @@ table Field {
dictionary: long;
// children apply only to Nested data types like Struct, List and Union
children: [Field];
+ /// the buffers produced for this type (as derived from the Type)
+ /// does not include children
+ /// each recordbatch will return instances of those Buffers.
+ buffers: [ VectorType ];
}
/// ----------------------------------------------------------------------
/// Endianness of the platform that produces the RecordBatch
-enum Endianness:int { Little, Big }
+enum Endianness:short { Little, Big }
/// ----------------------------------------------------------------------
/// A Schema describes the columns in a row batch
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/format/pom.xml
----------------------------------------------------------------------
diff --git a/java/format/pom.xml b/java/format/pom.xml
index cb11b5f..dc58975 100644
--- a/java/format/pom.xml
+++ b/java/format/pom.xml
@@ -106,6 +106,7 @@
<argument>-o</argument>
<argument>target/generated-sources/</argument>
<argument>../../format/Message.fbs</argument>
+ <argument>../../format/File.fbs</argument>
</arguments>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
----------------------------------------------------------------------
diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
index bbec26a..d10f002 100644
--- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
+++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
@@ -17,8 +17,6 @@
*/
package io.netty.buffer;
-import io.netty.util.internal.PlatformDependent;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,16 +28,18 @@ import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.arrow.memory.AllocationManager.BufferLedger;
import org.apache.arrow.memory.BaseAllocator;
+import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.BoundsChecking;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.BufferManager;
-import org.apache.arrow.memory.AllocationManager.BufferLedger;
-import org.apache.arrow.memory.BaseAllocator.Verbosity;
import org.apache.arrow.memory.util.HistoricalLog;
import com.google.common.base.Preconditions;
+import io.netty.util.internal.PlatformDependent;
+
public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class);
@@ -307,7 +307,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf order(ByteOrder endianness) {
+ public ArrowBuf order(ByteOrder endianness) {
return this;
}
@@ -344,7 +344,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf slice() {
+ public ArrowBuf slice() {
return slice(readerIndex(), readableBytes());
}
@@ -467,7 +467,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf retain(int increment) {
+ public ArrowBuf retain(int increment) {
Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
if (isEmpty) {
@@ -484,7 +484,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf retain() {
+ public ArrowBuf retain() {
return retain(1);
}
@@ -535,49 +535,49 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf setShort(int index, int value) {
+ public ArrowBuf setShort(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
return this;
}
@Override
- public ByteBuf setInt(int index, int value) {
+ public ArrowBuf setInt(int index, int value) {
chk(index, 4);
PlatformDependent.putInt(addr(index), value);
return this;
}
@Override
- public ByteBuf setLong(int index, long value) {
+ public ArrowBuf setLong(int index, long value) {
chk(index, 8);
PlatformDependent.putLong(addr(index), value);
return this;
}
@Override
- public ByteBuf setChar(int index, int value) {
+ public ArrowBuf setChar(int index, int value) {
chk(index, 2);
PlatformDependent.putShort(addr(index), (short) value);
return this;
}
@Override
- public ByteBuf setFloat(int index, float value) {
+ public ArrowBuf setFloat(int index, float value) {
chk(index, 4);
PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
return this;
}
@Override
- public ByteBuf setDouble(int index, double value) {
+ public ArrowBuf setDouble(int index, double value) {
chk(index, 8);
PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
return this;
}
@Override
- public ByteBuf writeShort(int value) {
+ public ArrowBuf writeShort(int value) {
ensure(2);
PlatformDependent.putShort(addr(writerIndex), (short) value);
writerIndex += 2;
@@ -585,7 +585,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf writeInt(int value) {
+ public ArrowBuf writeInt(int value) {
ensure(4);
PlatformDependent.putInt(addr(writerIndex), value);
writerIndex += 4;
@@ -593,7 +593,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf writeLong(long value) {
+ public ArrowBuf writeLong(long value) {
ensure(8);
PlatformDependent.putLong(addr(writerIndex), value);
writerIndex += 8;
@@ -601,7 +601,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf writeChar(int value) {
+ public ArrowBuf writeChar(int value) {
ensure(2);
PlatformDependent.putShort(addr(writerIndex), (short) value);
writerIndex += 2;
@@ -609,7 +609,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf writeFloat(float value) {
+ public ArrowBuf writeFloat(float value) {
ensure(4);
PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
writerIndex += 4;
@@ -617,7 +617,7 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf writeDouble(double value) {
+ public ArrowBuf writeDouble(double value) {
ensure(8);
PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
writerIndex += 8;
@@ -625,19 +625,19 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ public ArrowBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
udle.getBytes(index + offset, dst, dstIndex, length);
return this;
}
@Override
- public ByteBuf getBytes(int index, ByteBuffer dst) {
+ public ArrowBuf getBytes(int index, ByteBuffer dst) {
udle.getBytes(index + offset, dst);
return this;
}
@Override
- public ByteBuf setByte(int index, int value) {
+ public ArrowBuf setByte(int index, int value) {
chk(index, 1);
PlatformDependent.putByte(addr(index), (byte) value);
return this;
@@ -699,13 +699,13 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
udle.getBytes(index + offset, dst, dstIndex, length);
return this;
}
@Override
- public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ public ArrowBuf getBytes(int index, OutputStream out, int length) throws IOException {
udle.getBytes(index + offset, out, length);
return this;
}
@@ -724,12 +724,12 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ public ArrowBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
udle.setBytes(index + offset, src, srcIndex, length);
return this;
}
- public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
+ public ArrowBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
if (src.isDirect()) {
checkIndex(index, length);
PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
@@ -749,13 +749,13 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ public ArrowBuf setBytes(int index, byte[] src, int srcIndex, int length) {
udle.setBytes(index + offset, src, srcIndex, length);
return this;
}
@Override
- public ByteBuf setBytes(int index, ByteBuffer src) {
+ public ArrowBuf setBytes(int index, ByteBuffer src) {
udle.setBytes(index + offset, src);
return this;
}
@@ -860,4 +860,17 @@ public final class ArrowBuf extends AbstractByteBuf implements AutoCloseable {
}
}
+ @Override
+ public ArrowBuf readerIndex(int readerIndex) {
+ super.readerIndex(readerIndex);
+ return this;
+ }
+
+ @Override
+ public ArrowBuf writerIndex(int writerIndex) {
+ super.writerIndex(writerIndex);
+ return this;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/data/ArrowTypes.tdd
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/data/ArrowTypes.tdd b/java/vector/src/main/codegen/data/ArrowTypes.tdd
index 4ab7f85..2ecad3d 100644
--- a/java/vector/src/main/codegen/data/ArrowTypes.tdd
+++ b/java/vector/src/main/codegen/data/ArrowTypes.tdd
@@ -30,7 +30,7 @@
},
{
name: "Union",
- fields: []
+ fields: [{name: "mode", type: short}]
},
{
name: "Int",
@@ -38,7 +38,7 @@
},
{
name: "FloatingPoint",
- fields: [{name: precision, type: int}]
+ fields: [{name: precision, type: short}]
},
{
name: "Utf8",
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/ArrowType.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/ArrowType.java b/java/vector/src/main/codegen/templates/ArrowType.java
index 6dfaf21..29dee20 100644
--- a/java/vector/src/main/codegen/templates/ArrowType.java
+++ b/java/vector/src/main/codegen/templates/ArrowType.java
@@ -24,9 +24,8 @@ import java.util.Objects;
<@pp.dropOutputFile />
<@pp.changeOutputFile name="/org/apache/arrow/vector/types/pojo/ArrowType.java" />
-
-
<#include "/@includes/license.ftl" />
+
package org.apache.arrow.vector.types.pojo;
import com.google.flatbuffers.FlatBufferBuilder;
@@ -38,7 +37,13 @@ public abstract class ArrowType {
public abstract byte getTypeType();
public abstract int getType(FlatBufferBuilder builder);
+ public abstract <T> T accept(ArrowTypeVisitor<T> visitor);
+ public static interface ArrowTypeVisitor<T> {
+ <#list arrowTypes.types as type>
+ T visit(${type.name} type);
+ </#list>
+ }
<#list arrowTypes.types as type>
<#assign name = type.name>
@@ -70,9 +75,14 @@ public abstract class ArrowType {
@Override
public int getType(FlatBufferBuilder builder) {
+ <#list type.fields as field>
+ <#if field.type == "String">
+ int ${field.name} = builder.createString(this.${field.name});
+ </#if>
+ </#list>
org.apache.arrow.flatbuf.${type.name}.start${type.name}(builder);
<#list type.fields as field>
- org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, <#if field.type == "String">builder.createString(${field.name})<#else>${field.name}</#if>);
+ org.apache.arrow.flatbuf.${type.name}.add${field.name?cap_first}(builder, ${field.name});
</#list>
return org.apache.arrow.flatbuf.${type.name}.end${type.name}(builder);
}
@@ -83,6 +93,14 @@ public abstract class ArrowType {
}
</#list>
+ public String toString() {
+ return "${name}{"
+ <#list fields as field>
+ + ", " + ${field.name}
+ </#list>
+ + "}";
+ }
+
@Override
public int hashCode() {
return Objects.hash(<#list type.fields as field>${field.name}<#if field_has_next>, </#if></#list>);
@@ -102,6 +120,11 @@ public abstract class ArrowType {
</#list>
</#if>
}
+
+ @Override
+ public <T> T accept(ArrowTypeVisitor<T> visitor) {
+ return visitor.visit(this);
+ }
}
</#list>
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/NullableValueVectors.java b/java/vector/src/main/codegen/templates/NullableValueVectors.java
index df50897..6b1aa04 100644
--- a/java/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/java/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -29,6 +29,9 @@
package org.apache.arrow.vector;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import java.util.Collections;
+
<#include "/@includes/vv_imports.ftl" />
/**
@@ -39,7 +42,7 @@ package org.apache.arrow.vector;
* NB: this class is automatically generated from ${.template_name} and ValueVectorTypes.tdd using FreeMarker.
*/
@SuppressWarnings("unused")
-public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{
+public final class ${className} extends BaseDataValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector, FieldVector {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(${className}.class);
private final FieldReader reader = new ${minor.class}ReaderImpl(Nullable${minor.class}Vector.this);
@@ -54,6 +57,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
private final Mutator mutator;
private final Accessor accessor;
+ private final List<BufferBacked> innerVectors;
+
<#if minor.class == "Decimal">
private final int precision;
private final int scale;
@@ -66,6 +71,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type
mutator = new Mutator();
accessor = new Accessor();
field = new Field(name, true, new Decimal(precision, scale), null);
+ innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
+ bits,
+ values
+ ));
}
<#else>
public ${className}(String name, BufferAllocator allocator) {
@@ -88,9 +97,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
<#elseif minor.class == "Time">
field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Time(), null);
<#elseif minor.class == "Float4">
- field = new Field(name, true, new FloatingPoint(0), null);
+ field = new Field(name, true, new FloatingPoint(org.apache.arrow.flatbuf.Precision.SINGLE), null);
<#elseif minor.class == "Float8">
- field = new Field(name, true, new FloatingPoint(1), null);
+ field = new Field(name, true, new FloatingPoint(org.apache.arrow.flatbuf.Precision.DOUBLE), null);
<#elseif minor.class == "TimeStamp">
field = new Field(name, true, new org.apache.arrow.vector.types.pojo.ArrowType.Timestamp(""), null);
<#elseif minor.class == "IntervalDay">
@@ -104,10 +113,44 @@ public final class ${className} extends BaseDataValueVector implements <#if type
<#elseif minor.class == "Bit">
field = new Field(name, true, new Bool(), null);
</#if>
+ innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(
+ bits,
+ <#if type.major = "VarLen">
+ values.offsetVector,
+ </#if>
+ values
+ ));
}
</#if>
@Override
+ public List<BufferBacked> getFieldInnerVectors() {
+ return innerVectors;
+ }
+
+ @Override
+ public void initializeChildrenFromFields(List<Field> children) {
+ if (!children.isEmpty()) {
+ throw new IllegalArgumentException("primitive type vector ${className} can not have children: " + children);
+ }
+ }
+
+ @Override
+ public List<FieldVector> getChildrenFromFields() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+ org.apache.arrow.vector.BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
+ // TODO: do something with the sizes in fieldNode?
+ }
+
+ public List<ArrowBuf> getFieldBuffers() {
+ return org.apache.arrow.vector.BaseDataValueVector.unload(getFieldInnerVectors());
+ }
+
+ @Override
public Field getField() {
return field;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/codegen/templates/UnionVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index 1fef490..72125fa 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -42,6 +42,10 @@ import java.util.ArrayList;
import java.util.Iterator;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.util.CallBack;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+
+import static org.apache.arrow.flatbuf.UnionMode.Sparse;
+
/*
* This class is generated using freemarker and the ${.template_name} template.
@@ -57,7 +61,7 @@ import org.apache.arrow.vector.util.CallBack;
* For performance reasons, UnionVector stores a cached reference to each subtype vector, to avoid having to do the map lookup
* each time the vector is accessed.
*/
-public class UnionVector implements ValueVector {
+public class UnionVector implements FieldVector {
private String name;
private BufferAllocator allocator;
@@ -95,6 +99,34 @@ public class UnionVector implements ValueVector {
return MinorType.UNION;
}
+ @Override
+ public void initializeChildrenFromFields(List<Field> children) {
+ getMap().initializeChildrenFromFields(children);
+ }
+
+ @Override
+ public List<FieldVector> getChildrenFromFields() {
+ return getMap().getChildrenFromFields();
+ }
+
+ @Override
+ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ArrowBuf> getFieldBuffers() {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<BufferBacked> getFieldInnerVectors() {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+
public MapVector getMap() {
if (mapVector == null) {
int vectorCount = internalMap.size();
@@ -203,7 +235,7 @@ public class UnionVector implements ValueVector {
for (ValueVector v : internalMap.getChildren()) {
childFields.add(v.getField());
}
- return new Field(name, true, new ArrowType.Union(), childFields);
+ return new Field(name, true, new ArrowType.Union(Sparse), childFields);
}
@Override
@@ -237,10 +269,10 @@ public class UnionVector implements ValueVector {
copyFrom(inIndex, outIndex, from);
}
- public ValueVector addVector(ValueVector v) {
+ public FieldVector addVector(FieldVector v) {
String name = v.getMinorType().name().toLowerCase();
Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name));
- final ValueVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass());
+ final FieldVector newVector = internalMap.addOrGet(name, v.getMinorType(), v.getClass());
v.makeTransferPair(newVector).transfer();
internalMap.putChild(name, newVector);
if (callBack != null) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
index 05b7cf1..c22258d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
@@ -17,15 +17,38 @@
*/
package org.apache.arrow.vector;
-import io.netty.buffer.ArrowBuf;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
+import io.netty.buffer.ArrowBuf;
+
-public abstract class BaseDataValueVector extends BaseValueVector {
+public abstract class BaseDataValueVector extends BaseValueVector implements BufferBacked {
protected final static byte[] emptyByteArray = new byte[]{}; // Nullable vectors use this
+ public static void load(List<BufferBacked> vectors, List<ArrowBuf> buffers) {
+ int expectedSize = vectors.size();
+ if (buffers.size() != expectedSize) {
+ throw new IllegalArgumentException("Illegal buffer count, expected " + expectedSize + ", got: " + buffers.size());
+ }
+ for (int i = 0; i < expectedSize; i++) {
+ vectors.get(i).load(buffers.get(i));
+ }
+ }
+
+ public static List<ArrowBuf> unload(List<BufferBacked> vectors) {
+ List<ArrowBuf> result = new ArrayList<>(vectors.size());
+ for (BufferBacked vector : vectors) {
+ result.add(vector.unLoad());
+ }
+ return result;
+ }
+
+ // TODO: Nullable vectors extend BaseDataValueVector but do not use the data field
+ // We should fix the inheritance tree
protected ArrowBuf data;
public BaseDataValueVector(String name, BufferAllocator allocator) {
@@ -82,6 +105,17 @@ public abstract class BaseDataValueVector extends BaseValueVector {
return data;
}
+ @Override
+ public void load(ArrowBuf data) {
+ this.data.release();
+ this.data = data.retain(allocator);
+ }
+
+ @Override
+ public ArrowBuf unLoad() {
+ return this.data.readerIndex(0);
+ }
+
/**
* This method has a similar effect of allocateNew() without actually clearing and reallocating
* the value vector. The purpose is to move the value vector to a "mutate" state
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
new file mode 100644
index 0000000..d1c262d
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Content is backed by a buffer and can be loaded/unloaded
+ */
+public interface BufferBacked {
+
+ void load(ArrowBuf data);
+
+ ArrowBuf unLoad();
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
new file mode 100644
index 0000000..b28433c
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import java.util.List;
+
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * A vector corresponding to a Field in the schema
+ * It has inner vectors backed by buffers (validity, offsets, data, ...)
+ */
+public interface FieldVector extends ValueVector {
+
+ /**
+ * Initializes the child vectors
+ * to be later loaded with loadBuffers
+ * @param children the schema
+ */
+ void initializeChildrenFromFields(List<Field> children);
+
+ /**
+ * the returned list is the same size as the list passed to initializeChildrenFromFields
+ * @return the children according to schema (empty for primitive types)
+ */
+ List<FieldVector> getChildrenFromFields();
+
+ /**
+ * loads data in the vectors
+ * (ownBuffers must be the same size as getFieldVectors())
+ * @param fieldNode the fieldNode
+ * @param ownBuffers the buffers for this Field (own buffers only, children not included)
+ */
+ void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers);
+
+ /**
+ * (same size as getFieldVectors() since it is their content)
+ * @return the buffers containing the data for this vector (ready for reading)
+ */
+ List<ArrowBuf> getFieldBuffers();
+
+ /**
+ * @return the inner vectors for this field as defined by the TypeLayout
+ */
+ List<BufferBacked> getFieldInnerVectors();
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
index 35321c9..ba7790e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ValueVector.java
@@ -19,14 +19,14 @@ package org.apache.arrow.vector;
import java.io.Closeable;
-import io.netty.buffer.ArrowBuf;
-
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.util.TransferPair;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.TransferPair;
+
+import io.netty.buffer.ArrowBuf;
/**
* An abstraction that is used to store a sequence of values in an individual column.
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
new file mode 100644
index 0000000..58ac68b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.common.collect.Iterators;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Loads buffers into vectors
+ */
+public class VectorLoader {
+ private final List<FieldVector> fieldVectors;
+ private final List<Field> fields;
+
+ /**
+ * will create children in root based on schema
+ * @param schema the expected schema
+ * @param root the root to add vectors to based on schema
+ */
+ public VectorLoader(Schema schema, FieldVector root) {
+ super();
+ this.fields = schema.getFields();
+ root.initializeChildrenFromFields(fields);
+ this.fieldVectors = root.getChildrenFromFields();
+ if (this.fieldVectors.size() != fields.size()) {
+ throw new IllegalArgumentException("The root vector did not create the right number of children. found " + fieldVectors.size() + " expected " + fields.size());
+ }
+ }
+
+ /**
+ * Loads the record batch in the vectors
+ * will not close the record batch
+ * @param recordBatch
+ */
+ public void load(ArrowRecordBatch recordBatch) {
+ Iterator<ArrowBuf> buffers = recordBatch.getBuffers().iterator();
+ Iterator<ArrowFieldNode> nodes = recordBatch.getNodes().iterator();
+ for (int i = 0; i < fields.size(); ++i) {
+ Field field = fields.get(i);
+ FieldVector fieldVector = fieldVectors.get(i);
+ loadBuffers(fieldVector, field, buffers, nodes);
+ }
+ if (nodes.hasNext() || buffers.hasNext()) {
+ throw new IllegalArgumentException("not all nodes and buffers where consumed. nodes: " + Iterators.toString(nodes) + " buffers: " + Iterators.toString(buffers));
+ }
+ }
+
+ private void loadBuffers(FieldVector vector, Field field, Iterator<ArrowBuf> buffers, Iterator<ArrowFieldNode> nodes) {
+ ArrowFieldNode fieldNode = nodes.next();
+ List<VectorLayout> typeLayout = field.getTypeLayout().getVectors();
+ List<ArrowBuf> ownBuffers = new ArrayList<>(typeLayout.size());
+ for (int j = 0; j < typeLayout.size(); j++) {
+ ownBuffers.add(buffers.next());
+ }
+ try {
+ vector.loadFieldBuffers(fieldNode, ownBuffers);
+ } catch (RuntimeException e) {
+ throw new IllegalArgumentException("Could not load buffers for field " + field);
+ }
+ List<Field> children = field.getChildren();
+ if (children.size() > 0) {
+ List<FieldVector> childrenFromFields = vector.getChildrenFromFields();
+ checkArgument(children.size() == childrenFromFields.size(), "should have as many children as in the schema: found " + childrenFromFields.size() + " expected " + children.size());
+ for (int i = 0; i < childrenFromFields.size(); i++) {
+ Field child = children.get(i);
+ FieldVector fieldVector = childrenFromFields.get(i);
+ loadBuffers(fieldVector, child, buffers, nodes);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
new file mode 100644
index 0000000..e4d37bf
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.vector.ValueVector.Accessor;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import io.netty.buffer.ArrowBuf;
+
+public class VectorUnloader {
+
+ private final Schema schema;
+ private final int valueCount;
+ private final List<FieldVector> vectors;
+
+ public VectorUnloader(FieldVector parent) {
+ super();
+ this.schema = new Schema(parent.getField().getChildren());
+ this.valueCount = parent.getAccessor().getValueCount();
+ this.vectors = parent.getChildrenFromFields();
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public ArrowRecordBatch getRecordBatch() {
+ List<ArrowFieldNode> nodes = new ArrayList<>();
+ List<ArrowBuf> buffers = new ArrayList<>();
+ for (FieldVector vector : vectors) {
+ appendNodes(vector, nodes, buffers);
+ }
+ return new ArrowRecordBatch(valueCount, nodes, buffers);
+ }
+
+ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
+ Accessor accessor = vector.getAccessor();
+ int nullCount = 0;
+ // TODO: should not have to do that
+ // we can do that a lot more efficiently (for example with Long.bitCount(i))
+ for (int i = 0; i < accessor.getValueCount(); i++) {
+ if (accessor.isNull(i)) {
+ nullCount ++;
+ }
+ }
+ nodes.add(new ArrowFieldNode(accessor.getValueCount(), nullCount));
+ List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
+ List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
+ if (fieldBuffers.size() != expectedBuffers.size()) {
+ throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers);
+ }
+ buffers.addAll(fieldBuffers);
+ for (FieldVector child : vector.getChildrenFromFields()) {
+ appendNodes(child, nodes, buffers);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
index 705a24b..c2482ad 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
@@ -17,25 +17,23 @@
*/
package org.apache.arrow.vector;
-import com.google.flatbuffers.FlatBufferBuilder;
-import io.netty.buffer.ArrowBuf;
-
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
-import org.apache.arrow.flatbuf.Type;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.complex.impl.NullReader;
import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType.Null;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.TransferPair;
-import com.google.common.collect.Iterators;
+import io.netty.buffer.ArrowBuf;
-public class ZeroVector implements ValueVector {
+public class ZeroVector implements FieldVector {
public final static ZeroVector INSTANCE = new ZeroVector();
private final String name = "[DEFAULT]";
@@ -175,4 +173,33 @@ public class ZeroVector implements ValueVector {
public FieldReader getReader() {
return NullReader.INSTANCE;
}
+
+ @Override
+ public void initializeChildrenFromFields(List<Field> children) {
+ if (!children.isEmpty()) {
+ throw new IllegalArgumentException("Zero vector has no children");
+ }
+ }
+
+ @Override
+ public List<FieldVector> getChildrenFromFields() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+ if (!ownBuffers.isEmpty()) {
+ throw new IllegalArgumentException("Zero vector has no buffers");
+ }
+ }
+
+ @Override
+ public List<ArrowBuf> getFieldBuffers() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<BufferBacked> getFieldInnerVectors() {
+ return Collections.emptyList();
+ }
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
index ed77975..2f68886 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractContainerVector.java
@@ -17,22 +17,13 @@
*/
package org.apache.arrow.vector.complex;
-import java.util.Collection;
-
-import javax.annotation.Nullable;
-
-import org.apache.arrow.flatbuf.Field;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.util.CallBack;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
/**
* Base class for composite vectors.
*
@@ -65,8 +56,8 @@ public abstract class AbstractContainerVector implements ValueVector {
/**
* Returns a {@link org.apache.arrow.vector.ValueVector} corresponding to the given field name if exists or null.
*/
- public ValueVector getChild(String name) {
- return getChild(name, ValueVector.class);
+ public FieldVector getChild(String name) {
+ return getChild(name, FieldVector.class);
}
/**
@@ -81,7 +72,7 @@ public abstract class AbstractContainerVector implements ValueVector {
protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz) {
if (clazz.isAssignableFrom(v.getClass())) {
- return (T) v;
+ return clazz.cast(v);
}
throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Arrow doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
}
@@ -94,10 +85,10 @@ public abstract class AbstractContainerVector implements ValueVector {
public abstract int size();
// add a new vector with the input MajorType or return the existing vector if we already added one with the same type
- public abstract <T extends ValueVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale);
+ public abstract <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale);
// return the child vector with the input name
- public abstract <T extends ValueVector> T getChild(String name, Class<T> clazz);
+ public abstract <T extends FieldVector> T getChild(String name, Class<T> clazz);
// return the child vector's ordinal in the composite container
public abstract VectorWithOrdinal getChildVectorWithOrdinal(String name);
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
index 5964f80..23b4997 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
@@ -17,23 +17,24 @@
*/
package org.apache.arrow.vector.complex;
-import com.google.common.collect.ImmutableList;
-import io.netty.buffer.ArrowBuf;
-
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import org.apache.arrow.flatbuf.Field;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.MapWithOrdinal;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import io.netty.buffer.ArrowBuf;
+
/*
* Base class for MapVectors. Currently used by RepeatedMapVector and MapVector
*/
@@ -41,7 +42,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
// Maintains a map with key as field name and value is the vector itself
- private final MapWithOrdinal<String, ValueVector> vectors = new MapWithOrdinal<>();
+ private final MapWithOrdinal<String, FieldVector> vectors = new MapWithOrdinal<>();
protected AbstractMapVector(String name, BufferAllocator allocator, CallBack callBack) {
super(name, allocator, callBack);
@@ -109,19 +110,19 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
* @return resultant {@link org.apache.arrow.vector.ValueVector}
*/
@Override
- public <T extends ValueVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) {
+ public <T extends FieldVector> T addOrGet(String name, MinorType minorType, Class<T> clazz, int... precisionScale) {
final ValueVector existing = getChild(name);
boolean create = false;
if (existing == null) {
create = true;
} else if (clazz.isAssignableFrom(existing.getClass())) {
- return (T) existing;
+ return clazz.cast(existing);
} else if (nullFilled(existing)) {
existing.clear();
create = true;
}
if (create) {
- final T vector = (T) minorType.getNewVector(name, allocator, callBack, precisionScale);
+ final T vector = clazz.cast(minorType.getNewVector(name, allocator, callBack, precisionScale));
putChild(name, vector);
if (callBack!=null) {
callBack.doWork();
@@ -153,7 +154,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
* field name if exists or null.
*/
@Override
- public <T extends ValueVector> T getChild(String name, Class<T> clazz) {
+ public <T extends FieldVector> T getChild(String name, Class<T> clazz) {
final ValueVector v = vectors.get(name.toLowerCase());
if (v == null) {
return null;
@@ -161,12 +162,25 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
return typeify(v, clazz);
}
+ protected ValueVector add(String name, MinorType minorType, int... precisionScale) {
+ final ValueVector existing = getChild(name);
+ if (existing != null) {
+ throw new IllegalStateException(String.format("Vector already exists: Existing[%s], Requested[%s] ", existing.getClass().getSimpleName(), minorType));
+ }
+ FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale);
+ putChild(name, vector);
+ if (callBack!=null) {
+ callBack.doWork();
+ }
+ return vector;
+ }
+
/**
* Inserts the vector with the given name if it does not exist else replaces it with the new value.
*
* Note that this method does not enforce any vector type check nor throws a schema change exception.
*/
- protected void putChild(String name, ValueVector vector) {
+ protected void putChild(String name, FieldVector vector) {
putVector(name, vector);
}
@@ -175,7 +189,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
* @param name field name
* @param vector vector to be inserted
*/
- protected void putVector(String name, ValueVector vector) {
+ protected void putVector(String name, FieldVector vector) {
final ValueVector old = vectors.put(
Preconditions.checkNotNull(name, "field name cannot be null").toLowerCase(),
Preconditions.checkNotNull(vector, "vector cannot be null")
@@ -189,9 +203,9 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
/**
* Returns a sequence of underlying child vectors.
*/
- protected List<ValueVector> getChildren() {
+ protected List<FieldVector> getChildren() {
int size = vectors.size();
- List<ValueVector> children = new ArrayList<>();
+ List<FieldVector> children = new ArrayList<>();
for (int i = 0; i < size; i++) {
children.add(vectors.getByOrdinal(i));
}
@@ -216,7 +230,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
@Override
public Iterator<ValueVector> iterator() {
- return vectors.values().iterator();
+ return Collections.<ValueVector>unmodifiableCollection(vectors.values()).iterator();
}
/**
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
index 4226274..517d20c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java
@@ -17,8 +17,6 @@
*/
package org.apache.arrow.vector.complex;
-import io.netty.buffer.ArrowBuf;
-
import java.util.Collections;
import java.util.Iterator;
@@ -26,29 +24,32 @@ import org.apache.arrow.flatbuf.Type;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.AddOrGetResult;
import org.apache.arrow.vector.BaseValueVector;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ObjectArrays;
-import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.util.SchemaChangeRuntimeException;
+
+import io.netty.buffer.ArrowBuf;
public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector {
- public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
+ public final static FieldVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
public final static String OFFSETS_VECTOR_NAME = "$offsets$";
public final static String DATA_VECTOR_NAME = "$data$";
protected final UInt4Vector offsets;
- protected ValueVector vector;
+ protected FieldVector vector;
protected BaseRepeatedValueVector(String name, BufferAllocator allocator) {
this(name, allocator, DEFAULT_DATA_VECTOR);
}
- protected BaseRepeatedValueVector(String name, BufferAllocator allocator, ValueVector vector) {
+ protected BaseRepeatedValueVector(String name, BufferAllocator allocator, FieldVector vector) {
super(name, allocator);
this.offsets = new UInt4Vector(OFFSETS_VECTOR_NAME, allocator);
this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null");
@@ -83,7 +84,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
}
@Override
- public ValueVector getDataVector() {
+ public FieldVector getDataVector() {
return vector;
}
@@ -121,7 +122,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
@Override
public Iterator<ValueVector> iterator() {
- return Collections.singleton(getDataVector()).iterator();
+ return Collections.<ValueVector>singleton(getDataVector()).iterator();
}
@Override
@@ -167,7 +168,7 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements
return new AddOrGetResult<>((T)vector, created);
}
- protected void replaceDataVector(ValueVector v) {
+ protected void replaceDataVector(FieldVector v) {
vector.clear();
vector = v;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index c6c6b09..2984c36 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -18,15 +18,18 @@
******************************************************************************/
package org.apache.arrow.vector.complex;
-import com.google.common.collect.ImmutableList;
-import com.google.flatbuffers.FlatBufferBuilder;
-import io.netty.buffer.ArrowBuf;
+import static java.util.Collections.singletonList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.vector.AddOrGetResult;
+import org.apache.arrow.vector.BaseDataValueVector;
+import org.apache.arrow.vector.BufferBacked;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.ValueVector;
@@ -36,18 +39,24 @@ import org.apache.arrow.vector.complex.impl.UnionListReader;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.complex.writer.FieldWriter;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.TransferPair;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ObjectArrays;
-public class ListVector extends BaseRepeatedValueVector {
+import io.netty.buffer.ArrowBuf;
+
+public class ListVector extends BaseRepeatedValueVector implements FieldVector {
- UInt4Vector offsets;
+ final UInt4Vector offsets;
final UInt1Vector bits;
+ private final List<BufferBacked> innerVectors;
private Mutator mutator = new Mutator();
private Accessor accessor = new Accessor();
private UnionListWriter writer;
@@ -57,12 +66,46 @@ public class ListVector extends BaseRepeatedValueVector {
public ListVector(String name, BufferAllocator allocator, CallBack callBack) {
super(name, allocator);
this.bits = new UInt1Vector("$bits$", allocator);
- offsets = getOffsetVector();
+ this.offsets = getOffsetVector();
+ this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits, offsets));
this.writer = new UnionListWriter(this);
this.reader = new UnionListReader(this);
this.callBack = callBack;
}
+ @Override
+ public void initializeChildrenFromFields(List<Field> children) {
+ if (children.size() != 1) {
+ throw new IllegalArgumentException("Lists have only one child. Found: " + children);
+ }
+ Field field = children.get(0);
+ MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
+ AddOrGetResult<FieldVector> addOrGetVector = addOrGetVector(minorType);
+ if (!addOrGetVector.isCreated()) {
+ throw new IllegalArgumentException("Child vector already existed: " + addOrGetVector.getVector());
+ }
+ }
+
+ @Override
+ public List<FieldVector> getChildrenFromFields() {
+ return singletonList(getDataVector());
+ }
+
+ @Override
+ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+ BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
+ }
+
+ @Override
+ public List<ArrowBuf> getFieldBuffers() {
+ return BaseDataValueVector.unload(getFieldInnerVectors());
+ }
+
+ @Override
+ public List<BufferBacked> getFieldInnerVectors() {
+ return innerVectors;
+ }
+
public UnionListWriter getWriter() {
return writer;
}
@@ -86,7 +129,7 @@ public class ListVector extends BaseRepeatedValueVector {
}
@Override
- public ValueVector getDataVector() {
+ public FieldVector getDataVector() {
return vector;
}
@@ -298,4 +341,5 @@ public class ListVector extends BaseRepeatedValueVector {
bits.getMutator().setValueCount(valueCount);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
index 0cb613e..e369658 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java
@@ -17,10 +17,10 @@
*/
package org.apache.arrow.vector.complex;
-import io.netty.buffer.ArrowBuf;
-
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -28,13 +28,17 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BaseDataValueVector;
import org.apache.arrow.vector.BaseValueVector;
+import org.apache.arrow.vector.BufferBacked;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.ComplexHolder;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
-import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.CallBack;
@@ -45,7 +49,9 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
-public class MapVector extends AbstractMapVector {
+import io.netty.buffer.ArrowBuf;
+
+public class MapVector extends AbstractMapVector implements FieldVector {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
@@ -53,6 +59,9 @@ public class MapVector extends AbstractMapVector {
private final Mutator mutator = new Mutator();
int valueCount;
+ // TODO: validity vector
+ private final List<BufferBacked> innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList());
+
public MapVector(String name, BufferAllocator allocator, CallBack callBack){
super(name, allocator, callBack);
}
@@ -120,7 +129,7 @@ public class MapVector extends AbstractMapVector {
int expectedSize = getBufferSize();
int actualSize = super.getBufferSize();
- Preconditions.checkArgument(expectedSize == actualSize);
+ Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize);
return super.getBuffers(clear);
}
@@ -159,7 +168,7 @@ public class MapVector extends AbstractMapVector {
this.to.ephPair = null;
int i = 0;
- ValueVector vector;
+ FieldVector vector;
for (String child:from.getChildFieldNames()) {
int preSize = to.size();
vector = from.getChild(child);
@@ -175,7 +184,7 @@ public class MapVector extends AbstractMapVector {
// (This is similar to what happens in ScanBatch where the children cannot be added till they are
// read). To take care of this, we ensure that the hashCode of the MaterializedField does not
// include the hashCode of the children but is based only on MaterializedField$key.
- final ValueVector newVector = to.addOrGet(child, vector.getMinorType(), vector.getClass());
+ final FieldVector newVector = to.addOrGet(child, vector.getMinorType(), vector.getClass());
if (allocate && to.size() != preSize) {
newVector.allocateNew();
}
@@ -315,13 +324,45 @@ public class MapVector extends AbstractMapVector {
@Override
public void close() {
- final Collection<ValueVector> vectors = getChildren();
- for (final ValueVector v : vectors) {
+ final Collection<FieldVector> vectors = getChildren();
+ for (final FieldVector v : vectors) {
v.close();
}
vectors.clear();
+
valueCount = 0;
super.close();
}
+
+ @Override
+ public void initializeChildrenFromFields(List<Field> children) {
+ for (Field field : children) {
+ MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
+ FieldVector vector = (FieldVector)this.add(field.getName(), minorType);
+ vector.initializeChildrenFromFields(field.getChildren());
+ }
+ }
+
+ @Override
+ public List<FieldVector> getChildrenFromFields() {
+ return getChildren();
+ }
+
+ @Override
+ public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
+ BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
+ // TODO: something with fieldNode?
+ }
+
+ @Override
+ public List<ArrowBuf> getFieldBuffers() {
+ return BaseDataValueVector.unload(getFieldInnerVectors());
+ }
+
+ @Override
+ public List<BufferBacked> getFieldInnerVectors() {
+ return innerVectors;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
index 4d2adfb..89bfefc 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java
@@ -22,9 +22,9 @@ import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StateTool;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.Field;
import com.google.common.base.Preconditions;
-import org.apache.arrow.vector.types.pojo.Field;
public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class);
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
index 586b128..c282688 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/PromotableWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.arrow.vector.complex.impl;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.complex.AbstractMapVector;
@@ -129,7 +130,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter {
} else if (listVector != null) {
unionVector = listVector.promoteToUnion();
}
- unionVector.addVector(tp.getTo());
+ unionVector.addVector((FieldVector)tp.getTo());
writer = new UnionWriter(unionVector);
writer.setPosition(idx());
for (int i = 0; i < idx(); i++) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
new file mode 100644
index 0000000..90fb02b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import org.apache.arrow.flatbuf.Block;
+import org.apache.arrow.vector.schema.FBSerializable;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class ArrowBlock implements FBSerializable {
+
+ private final long offset;
+ private final int metadataLength;
+ private final long bodyLength;
+
+ public ArrowBlock(long offset, int metadataLength, long bodyLength) {
+ super();
+ this.offset = offset;
+ this.metadataLength = metadataLength;
+ this.bodyLength = bodyLength;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public int getMetadataLength() {
+ return metadataLength;
+ }
+
+ public long getBodyLength() {
+ return bodyLength;
+ }
+
+ @Override
+ public int writeTo(FlatBufferBuilder builder) {
+ return Block.createBlock(builder, offset, metadataLength, bodyLength);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32));
+ result = prime * result + metadataLength;
+ result = prime * result + (int) (offset ^ (offset >>> 32));
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ArrowBlock other = (ArrowBlock) obj;
+ if (bodyLength != other.bodyLength)
+ return false;
+ if (metadataLength != other.metadataLength)
+ return false;
+ if (offset != other.offset)
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
new file mode 100644
index 0000000..01e175b
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.Block;
+import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.vector.schema.FBSerializable;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+public class ArrowFooter implements FBSerializable {
+
+ private final Schema schema;
+
+ private final List<ArrowBlock> dictionaries;
+
+ private final List<ArrowBlock> recordBatches;
+
+ public ArrowFooter(Schema schema, List<ArrowBlock> dictionaries, List<ArrowBlock> recordBatches) {
+ super();
+ this.schema = schema;
+ this.dictionaries = dictionaries;
+ this.recordBatches = recordBatches;
+ }
+
+ public ArrowFooter(Footer footer) {
+ this(
+ Schema.convertSchema(footer.schema()),
+ dictionaries(footer),
+ recordBatches(footer)
+ );
+ }
+
+ private static List<ArrowBlock> recordBatches(Footer footer) {
+ List<ArrowBlock> recordBatches = new ArrayList<>();
+ Block tempBLock = new Block();
+ int recordBatchesLength = footer.recordBatchesLength();
+ for (int i = 0; i < recordBatchesLength; i++) {
+ Block block = footer.recordBatches(tempBLock, i);
+ recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
+ }
+ return recordBatches;
+ }
+
+ private static List<ArrowBlock> dictionaries(Footer footer) {
+ List<ArrowBlock> dictionaries = new ArrayList<>();
+ Block tempBLock = new Block();
+ int dictionariesLength = footer.dictionariesLength();
+ for (int i = 0; i < dictionariesLength; i++) {
+ Block block = footer.dictionaries(tempBLock, i);
+ dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
+ }
+ return dictionaries;
+ }
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public List<ArrowBlock> getDictionaries() {
+ return dictionaries;
+ }
+
+ public List<ArrowBlock> getRecordBatches() {
+ return recordBatches;
+ }
+
+ @Override
+ public int writeTo(FlatBufferBuilder builder) {
+ int schemaIndex = schema.getSchema(builder);
+ Footer.startDictionariesVector(builder, dictionaries.size());
+ int dicsOffset = endVector(builder, dictionaries);
+ Footer.startRecordBatchesVector(builder, recordBatches.size());
+ int rbsOffset = endVector(builder, recordBatches);
+ Footer.startFooter(builder);
+ Footer.addSchema(builder, schemaIndex);
+ Footer.addDictionaries(builder, dicsOffset);
+ Footer.addRecordBatches(builder, rbsOffset);
+ return Footer.endFooter(builder);
+ }
+
+ private int endVector(FlatBufferBuilder builder, List<ArrowBlock> blocks) {
+ for (ArrowBlock block : blocks) {
+ block.writeTo(builder);
+ }
+ return builder.endVector();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((dictionaries == null) ? 0 : dictionaries.hashCode());
+ result = prime * result + ((recordBatches == null) ? 0 : recordBatches.hashCode());
+ result = prime * result + ((schema == null) ? 0 : schema.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ArrowFooter other = (ArrowFooter) obj;
+ if (dictionaries == null) {
+ if (other.dictionaries != null)
+ return false;
+ } else if (!dictionaries.equals(other.dictionaries))
+ return false;
+ if (recordBatches == null) {
+ if (other.recordBatches != null)
+ return false;
+ } else if (!recordBatches.equals(other.recordBatches))
+ return false;
+ if (schema == null) {
+ if (other.schema != null)
+ return false;
+ } else if (!schema.equals(other.schema))
+ return false;
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/803afeb5/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
new file mode 100644
index 0000000..bbcd3e9
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.Buffer;
+import org.apache.arrow.flatbuf.FieldNode;
+import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.flatbuf.RecordBatch;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ArrowBuf;
+
+public class ArrowReader implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);
+
+ private static final byte[] MAGIC = "ARROW1".getBytes();
+
+ private final SeekableByteChannel in;
+
+ private final BufferAllocator allocator;
+
+ private ArrowFooter footer;
+
+ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
+ super();
+ this.in = in;
+ this.allocator = allocator;
+ }
+
+ private int readFully(ArrowBuf buffer, int l) throws IOException {
+ int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
+ buffer.writerIndex(n);
+ if (n != l) {
+ throw new IllegalStateException(n + " != " + l);
+ }
+ return n;
+ }
+
+ private int readFully(ByteBuffer buffer) throws IOException {
+ int total = 0;
+ int n;
+ do {
+ n = in.read(buffer);
+ total += n;
+ } while (n >= 0 && buffer.remaining() > 0);
+ buffer.flip();
+ return total;
+ }
+
+ private static int bytesToInt(byte[] bytes) {
+ return ((int)(bytes[3] & 255) << 24) +
+ ((int)(bytes[2] & 255) << 16) +
+ ((int)(bytes[1] & 255) << 8) +
+ ((int)(bytes[0] & 255) << 0);
+ }
+
+ public ArrowFooter readFooter() throws IOException {
+ if (footer == null) {
+ if (in.size() <= (MAGIC.length * 2 + 4)) {
+ throw new InvalidArrowFileException("file too small: " + in.size());
+ }
+ ByteBuffer buffer = ByteBuffer.allocate(4 + MAGIC.length);
+ long footerLengthOffset = in.size() - buffer.remaining();
+ in.position(footerLengthOffset);
+ readFully(buffer);
+ byte[] array = buffer.array();
+ if (!Arrays.equals(MAGIC, Arrays.copyOfRange(array, 4, array.length))) {
+ throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array()));
+ }
+ int footerLength = bytesToInt(array);
+ if (footerLength <= 0 || footerLength + MAGIC.length * 2 + 4 > in.size()) {
+ throw new InvalidArrowFileException("invalid footer length: " + footerLength);
+ }
+ long footerOffset = footerLengthOffset - footerLength;
+ LOGGER.debug(String.format("Footer starts at %d, length: %d", footerOffset, footerLength));
+ ByteBuffer footerBuffer = ByteBuffer.allocate(footerLength);
+ in.position(footerOffset);
+ readFully(footerBuffer);
+ Footer footerFB = Footer.getRootAsFooter(footerBuffer);
+ this.footer = new ArrowFooter(footerFB);
+ }
+ return footer;
+ }
+
+ // TODO: read dictionaries
+
+ public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
+ LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
+ int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
+ if (l < 0) {
+ throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
+ }
+ final ArrowBuf buffer = allocator.buffer(l);
+ LOGGER.debug("allocated buffer " + buffer);
+ in.position(recordBatchBlock.getOffset());
+ int n = readFully(buffer, l);
+ if (n != l) {
+ throw new IllegalStateException(n + " != " + l);
+ }
+ RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(buffer.nioBuffer().asReadOnlyBuffer());
+ int nodesLength = recordBatchFB.nodesLength();
+ final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
+ List<ArrowFieldNode> nodes = new ArrayList<>();
+ for (int i = 0; i < nodesLength; ++i) {
+ FieldNode node = recordBatchFB.nodes(i);
+ nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
+ }
+ List<ArrowBuf> buffers = new ArrayList<>();
+ for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
+ Buffer bufferFB = recordBatchFB.buffers(i);
+ LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
+ ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
+ buffers.add(vectorBuffer);
+ }
+ ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
+ LOGGER.debug("released buffer " + buffer);
+ buffer.release();
+ return arrowRecordBatch;
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+}