You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/11 03:56:28 UTC
[arrow] branch master updated: ARROW-6097: [Java] Avro adapter
implement unions type
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 34dd3ed ARROW-6097: [Java] Avro adapter implement unions type
34dd3ed is described below
commit 34dd3edece64c2268047db626fbfa967e36370a4
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Sat Aug 10 19:58:05 2019 -0700
ARROW-6097: [Java] Avro adapter implement unions type
Related to [ARROW-6097](https://issues.apache.org/jira/browse/ARROW-6097).
Support convert unions type like ["string"], ["string", 'int"] and nullable ["string", "int", "null"]
Closes #4984 from tianchen92/ARROW-6097 and squashes the following commits:
0cdca698b <tianchen> fix documents
3cc028fbe <tianchen> add constant INVALID_NULL_INDEX
b4a24637b <tianchen> add comment
656408847 <tianchen> refactor
b4d0fbe3e <tianchen> resolve comments
15e808c80 <tianchen> ARROW-6097: Avro adapter implement unions type
Authored-by: tianchen <ni...@alibaba-inc.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../main/java/org/apache/arrow/AvroToArrow.java | 5 +-
.../java/org/apache/arrow/AvroToArrowUtils.java | 220 ++++++++++-----------
.../arrow/consumers/AvroBooleanConsumer.java | 14 ++
.../apache/arrow/consumers/AvroBytesConsumer.java | 11 ++
.../apache/arrow/consumers/AvroDoubleConsumer.java | 13 ++
.../apache/arrow/consumers/AvroFloatConsumer.java | 13 ++
.../apache/arrow/consumers/AvroIntConsumer.java | 13 ++
.../apache/arrow/consumers/AvroLongConsumer.java | 13 ++
.../{Consumer.java => AvroNullConsumer.java} | 39 ++--
.../apache/arrow/consumers/AvroStringConsumer.java | 11 ++
.../apache/arrow/consumers/AvroUnionsConsumer.java | 83 ++++++++
.../java/org/apache/arrow/consumers/Consumer.java | 11 ++
.../arrow/consumers/NullableTypeConsumer.java | 12 ++
.../java/org/apache/arrow/AvroToArrowTest.java | 52 ++++-
.../resources/schema/test_nullable_union.avsc} | 29 +--
.../resources/schema/test_union.avsc} | 29 +--
.../src/main/codegen/templates/UnionVector.java | 12 ++
17 files changed, 398 insertions(+), 182 deletions(-)
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
index 4801d69..63199fc 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
@@ -41,9 +41,6 @@ public class AvroToArrow {
throws IOException {
Preconditions.checkNotNull(schema, "Avro schema object can not be null");
- VectorSchemaRoot root = VectorSchemaRoot.create(
- AvroToArrowUtils.avroToArrowSchema(schema), allocator);
- AvroToArrowUtils.avroToArrowVectors(decoder, root);
- return root;
+ return AvroToArrowUtils.avroToArrowVectors(schema, decoder, allocator);
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index c5ec765..25611a5 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.arrow.consumers.AvroBooleanConsumer;
import org.apache.arrow.consumers.AvroBytesConsumer;
@@ -33,21 +34,26 @@ import org.apache.arrow.consumers.AvroDoubleConsumer;
import org.apache.arrow.consumers.AvroFloatConsumer;
import org.apache.arrow.consumers.AvroIntConsumer;
import org.apache.arrow.consumers.AvroLongConsumer;
+import org.apache.arrow.consumers.AvroNullConsumer;
import org.apache.arrow.consumers.AvroStringConsumer;
+import org.apache.arrow.consumers.AvroUnionsConsumer;
import org.apache.arrow.consumers.Consumer;
import org.apache.arrow.consumers.NullableTypeConsumer;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.UnionMode;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
@@ -60,11 +66,10 @@ import org.apache.avro.io.Decoder;
*/
public class AvroToArrowUtils {
- private static final int DEFAULT_BUFFER_SIZE = 256;
- public static final String NULL_INDEX = "nullIndex";
+ private static final int INVALID_NULL_INDEX = -1;
/**
- * Creates a {@link Field} from the {@link Schema}
+ * Creates a {@link Consumer} from the {@link Schema}
*
<p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types.
*
@@ -78,69 +83,123 @@ public class AvroToArrowUtils {
* <li>BYTES --> ArrowType.Binary</li>
* </ul>
*/
- private static Field getArrowField(Schema schema, String name, boolean nullable) {
+ private static Consumer createConsumer(Schema schema, String name, BufferAllocator allocator) {
+ return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator);
+ }
+ private static Consumer createConsumer(
+ Schema schema,
+ String name,
+ boolean nullable,
+ int nullIndex,
+ BufferAllocator allocator) {
Preconditions.checkNotNull(schema, "Avro schema object can't be null");
Type type = schema.getType();
- ArrowType arrowType;
+
+ final ArrowType arrowType;
+ final FieldType fieldType;
+ final FieldVector vector;
+ final Consumer consumer;
switch (type) {
case UNION:
- return getUnionField(schema, name);
+ return createUnionConsumer(schema, name, allocator);
case STRING:
arrowType = new ArrowType.Utf8();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroStringConsumer((VarCharVector) vector);
break;
case INT:
arrowType = new ArrowType.Int(32, /*signed=*/true);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroIntConsumer((IntVector) vector);
break;
case BOOLEAN:
arrowType = new ArrowType.Bool();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroBooleanConsumer((BitVector) vector);
break;
case LONG:
arrowType = new ArrowType.Int(64, /*signed=*/true);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroLongConsumer((BigIntVector) vector);
break;
case FLOAT:
arrowType = new ArrowType.FloatingPoint(SINGLE);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroFloatConsumer((Float4Vector) vector);
break;
case DOUBLE:
arrowType = new ArrowType.FloatingPoint(DOUBLE);
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroDoubleConsumer((Float8Vector) vector);
break;
case BYTES:
arrowType = new ArrowType.Binary();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroBytesConsumer((VarBinaryVector) vector);
+ break;
+ case NULL:
+ arrowType = new ArrowType.Null();
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = fieldType.createNewSingleVector(name, allocator, null);
+ consumer = new AvroNullConsumer((ZeroVector) vector);
break;
default:
// no-op, shouldn't get here
throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName());
}
- final FieldType fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null,
- getMetaData(schema));
- return new Field(name, fieldType, null);
+ if (nullable) {
+ return new NullableTypeConsumer(consumer, nullIndex);
+ }
+ return consumer;
}
- private static Field getUnionField(Schema schema, String name) {
+ private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator) {
int size = schema.getTypes().size();
long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count();
- // avro schema not allow repeated type, so size == nullCount + 1 indicates nullable type.
- if (size == nullCount + 1) {
+ // union only has one type, convert to primitive type
+ if (size == 1) {
+ Schema subSchema = schema.getTypes().get(0);
+ return createConsumer(subSchema, name, allocator);
+ // size == 2 and has null type, convert to nullable primitive type
+ } else if (size == 2 && nullCount == 1) {
Schema nullSchema = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).findFirst().get();
- String nullIndex = String.valueOf(schema.getTypes().indexOf(nullSchema));
+ int nullIndex = schema.getTypes().indexOf(nullSchema);
+ Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
+ Preconditions.checkNotNull(subSchema, "schema should not be null.");
+ return createConsumer(subSchema, name, true, nullIndex, allocator);
- // if has two field and one is null type, convert to nullable primitive type
- if (size == 2) {
- Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
- Preconditions.checkNotNull(subSchema);
- subSchema.addProp(NULL_INDEX, nullIndex);
- return getArrowField(subSchema, name,true);
- } else {
- //TODO convert avro unions type to arrow UnionVector
- throw new UnsupportedOperationException();
- }
+ // real union type
} else {
- throw new UnsupportedOperationException();
+
+ final FieldType fieldType = new FieldType(/*nullable=*/true,
+ new ArrowType.Union(UnionMode.Sparse, null), /*dictionary=*/null, getMetaData(schema));
+ UnionVector unionVector =
+ (UnionVector) fieldType.createNewSingleVector(name, allocator, null);
+
+ Consumer[] delegates = new Consumer[size];
+ Types.MinorType[] types = new Types.MinorType[size];
+
+ for (int i = 0; i < size; i++) {
+ Schema subSchema = schema.getTypes().get(i);
+ Consumer delegate = createConsumer(subSchema, subSchema.getName(), allocator);
+ unionVector.directAddVector(delegate.getVector());
+ delegates[i] = delegate;
+ types[i] = delegate.getVector().getMinorType();
+ }
+ return new AvroUnionsConsumer(unionVector, delegates, types);
}
}
@@ -151,19 +210,22 @@ public class AvroToArrowUtils {
}
/**
- * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for the given Avro {@link Schema}.
+ * Read data from {@link Decoder} and generate a {@link VectorSchemaRoot}.
+ * @param schema avro schema
+ * @param decoder avro decoder to read data from
*/
- public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema) {
+ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder, BufferAllocator allocator)
+ throws IOException {
- Preconditions.checkNotNull(schema, "Avro Schema object can't be null");
- List<Field> arrowFields = new ArrayList<>();
+ List<FieldVector> vectors = new ArrayList<>();
+ List<Consumer> consumers = new ArrayList<>();
Schema.Type type = schema.getType();
- final Map<String, String> metadata = getMetaData(schema);
-
if (type == Type.RECORD) {
for (Schema.Field field : schema.getFields()) {
- arrowFields.add(getArrowField(field.schema(), field.name(),false));
+ Consumer consumer = createConsumer(field.schema(), field.name(), allocator);
+ consumers.add(consumer);
+ vectors.add(consumer.getVector());
}
} else if (type == Type.MAP) {
throw new UnsupportedOperationException();
@@ -172,82 +234,17 @@ public class AvroToArrowUtils {
} else if (type == Type.ENUM) {
throw new UnsupportedOperationException();
} else {
- arrowFields.add(getArrowField(schema, "", false));
+ Consumer consumer = createConsumer(schema, "", allocator);
+ consumers.add(consumer);
+ vectors.add(consumer.getVector());
}
- return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata);
- }
+ Preconditions.checkArgument(vectors.size() == consumers.size(),
+ "vectors size not equals consumers size");
- /**
- * Create primitive consumer to read data from decoder, will reduce boxing/unboxing operations.
- */
- public static Consumer createPrimitiveConsumer(ValueVector vector) {
+ List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
- Consumer consumer;
- switch (vector.getMinorType()) {
- case INT:
- consumer = new AvroIntConsumer((IntVector) vector);
- break;
- case VARBINARY:
- consumer = new AvroBytesConsumer((VarBinaryVector) vector);
- break;
- case VARCHAR:
- consumer = new AvroStringConsumer((VarCharVector) vector);
- break;
- case BIGINT:
- consumer = new AvroLongConsumer((BigIntVector) vector);
- break;
- case FLOAT4:
- consumer = new AvroFloatConsumer((Float4Vector) vector);
- break;
- case FLOAT8:
- consumer = new AvroDoubleConsumer((Float8Vector) vector);
- break;
- case BIT:
- consumer = new AvroBooleanConsumer((BitVector) vector);
- break;
- default:
- throw new RuntimeException("could not get consumer from type:" + vector.getMinorType());
- }
-
- if (vector.getField().isNullable()) {
- int nullIndex = getNullFieldIndex(vector.getField());
- return new NullableTypeConsumer(consumer, nullIndex);
- }
-
- return consumer;
- }
-
- /**
- * Get avro null field index from vector field metadata.
- */
- private static int getNullFieldIndex(Field field) {
- Map<String, String> metadata = field.getMetadata();
- Preconditions.checkNotNull(metadata, "metadata should not be null when vector is nullable");
- String index = metadata.get(AvroToArrowUtils.NULL_INDEX);
- Preconditions.checkNotNull(index, "nullIndex should not be null when vector is nullable");
- return Integer.parseInt(index);
- }
-
- /**
- * Iterate the given Avro {@link Decoder} object to fetch the data and transpose it to populate
- * the given Arrow Vector objects.
- * @param decoder avro decoder to read data.
- * @param root Arrow {@link VectorSchemaRoot} object to populate
- */
- public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) throws IOException {
-
- Preconditions.checkNotNull(decoder, "Avro decoder object can't be null");
- Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
-
- allocateVectors(root, DEFAULT_BUFFER_SIZE);
-
- // create consumers
- Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
- for (int i = 0; i < root.getFieldVectors().size(); i++) {
- FieldVector vector = root.getFieldVectors().get(i);
- consumers[i] = createPrimitiveConsumer(vector);
- }
+ VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
int valueCount = 0;
while (true) {
@@ -262,17 +259,6 @@ public class AvroToArrowUtils {
break;
}
}
- }
-
- private static void allocateVectors(VectorSchemaRoot root, int size) {
- List<FieldVector> vectors = root.getFieldVectors();
- for (FieldVector fieldVector : vectors) {
- if (fieldVector instanceof BaseFixedWidthVector) {
- ((BaseFixedWidthVector) fieldVector).allocateNew(size);
- } else {
- fieldVector.allocateNew();
- }
- fieldVector.setInitialCapacity(size);
- }
+ return root;
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index 134cc5c..b2fe704 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.impl.BitWriterImpl;
import org.apache.arrow.vector.complex.writer.BitWriter;
import org.apache.avro.io.Decoder;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
public class AvroBooleanConsumer implements Consumer {
private final BitWriter writer;
+ private final BitVector vector;
/**
* Instantiate a AvroBooleanConsumer.
*/
public AvroBooleanConsumer(BitVector vector) {
+ this.vector = vector;
this.writer = new BitWriterImpl(vector);
}
@@ -49,4 +52,15 @@ public class AvroBooleanConsumer implements Consumer {
public void addNull() {
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
+
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index 0424641..2c649f9 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
@@ -68,4 +69,14 @@ public class AvroBytesConsumer implements Consumer {
holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit());
writer.write(holder);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 9a60c24..63b2071 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.complex.impl.Float8WriterImpl;
import org.apache.arrow.vector.complex.writer.Float8Writer;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
public class AvroDoubleConsumer implements Consumer {
private final Float8Writer writer;
+ private final Float8Vector vector;
/**
* Instantiate a AvroDoubleConsumer.
*/
public AvroDoubleConsumer(Float8Vector vector) {
+ this.vector = vector;
this.writer = new Float8WriterImpl(vector);
}
@@ -49,4 +52,14 @@ public class AvroDoubleConsumer implements Consumer {
public void addNull() {
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index 8bfe85f..ea752e2 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.complex.impl.Float4WriterImpl;
import org.apache.arrow.vector.complex.writer.Float4Writer;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
public class AvroFloatConsumer implements Consumer {
private final Float4Writer writer;
+ private final Float4Vector vector;
/**
* Instantiate a AvroFloatConsumer.
*/
public AvroFloatConsumer(Float4Vector vector) {
+ this.vector = vector;
this.writer = new Float4WriterImpl(vector);
}
@@ -49,4 +52,14 @@ public class AvroFloatConsumer implements Consumer {
public void addNull() {
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index ce117e7..ab830bc 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.complex.impl.IntWriterImpl;
import org.apache.arrow.vector.complex.writer.IntWriter;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
public class AvroIntConsumer implements Consumer {
private final IntWriter writer;
+ private final IntVector vector;
/**
* Instantiate a AvroIntConsumer.
*/
public AvroIntConsumer(IntVector vector) {
+ this.vector = vector;
this.writer = new IntWriterImpl(vector);
}
@@ -49,4 +52,14 @@ public class AvroIntConsumer implements Consumer {
public void addNull() {
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 42e3666..68acb94 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.impl.BigIntWriterImpl;
import org.apache.arrow.vector.complex.writer.BigIntWriter;
import org.apache.avro.io.Decoder;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
public class AvroLongConsumer implements Consumer {
private final BigIntWriter writer;
+ private final BigIntVector vector;
/**
* Instantiate a AvroLongConsumer.
*/
public AvroLongConsumer(BigIntVector vector) {
+ this.vector = vector;
this.writer = new BigIntWriterImpl(vector);
}
@@ -49,4 +52,14 @@ public class AvroLongConsumer implements Consumer {
public void addNull() {
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
similarity index 59%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
index 5784242..d06e2f5 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
@@ -19,22 +19,33 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ZeroVector;
import org.apache.avro.io.Decoder;
/**
- * Interface that is used to consume values from avro decoder.
+ * Consumer which consume null type values from avro decoder.
+ * Corresponding to {@link org.apache.arrow.vector.ZeroVector}.
*/
-public interface Consumer {
-
- /**
- * Consume a specific type value from avro decoder and write it to vector.
- * @param decoder avro decoder to read data
- * @throws IOException on error
- */
- void consume(Decoder decoder) throws IOException;
-
- /**
- * Add null value to vector by making writer position + 1.
- */
- void addNull();
+public class AvroNullConsumer implements Consumer {
+
+ private final ZeroVector vector;
+
+ public AvroNullConsumer(ZeroVector vector) {
+ this.vector = vector;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {}
+
+ @Override
+ public void addNull() {}
+
+ @Override
+ public void setPosition(int index) {}
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index 7d6d495..1719bf7 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
import org.apache.arrow.vector.complex.writer.VarCharWriter;
@@ -69,4 +70,14 @@ public class AvroStringConsumer implements Consumer {
writer.write(holder);
}
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
new file mode 100644
index 0000000..5277678
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.complex.impl.UnionWriter;
+import org.apache.arrow.vector.types.Types;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume unions type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.complex.UnionVector}.
+ */
+public class AvroUnionsConsumer implements Consumer {
+
+ private Consumer[] indexDelegates;
+ private Types.MinorType[] types;
+
+ private UnionWriter writer;
+ private UnionVector vector;
+
+ /**
+ * Instantiate a AvroUnionConsumer.
+ */
+ public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) {
+
+ this.writer = new UnionWriter(vector);
+ this.vector = vector;
+ this.indexDelegates = indexDelegates;
+ this.types = types;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ int fieldIndex = decoder.readInt();
+ int position = writer.getPosition();
+
+ Consumer delegate = indexDelegates[fieldIndex];
+
+ vector.setType(position, types[fieldIndex]);
+ // In UnionVector we need to set sub vector writer position before consume a value
+ // because in the previous iterations we might not have written to the specific union sub vector.
+ delegate.setPosition(position);
+ delegate.consume(decoder);
+
+ writer.setPosition(position + 1);
+
+ }
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ @Override
+ public void setPosition(int index) {
+ writer.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ vector.setValueCount(writer.getPosition());
+ return this.vector;
+ }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index 5784242..c3a543c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.avro.io.Decoder;
/**
@@ -37,4 +38,14 @@ public interface Consumer {
* Add null value to vector by making writer position + 1.
*/
void addNull();
+
+ /**
+ * Set the position to write value into vector.
+ */
+ void setPosition(int index);
+
+ /**
+ * Get the vector within the consumer.
+ */
+ FieldVector getVector();
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
index 31fe3b8..5ac7bd7 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.avro.io.Decoder;
/**
@@ -53,4 +54,15 @@ public class NullableTypeConsumer implements Consumer {
public void addNull() {
delegate.addNull();
}
+
+ @Override
+ public void setPosition(int index) {
+ delegate.setPosition(index);
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return delegate.getVector();
+ }
+
}
diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
index cce9ea9..47840d6 100644
--- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
+++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -36,7 +36,7 @@ import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.Text;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -286,6 +286,52 @@ public class AvroToArrowTest {
checkRecordResult(schema, data, root);
}
+ @Test
+ public void testUnionType() throws Exception {
+ Schema schema = getSchema("test_union.avsc");
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<Object> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? "test" + i : i);
+ expected.add(i % 2 == 0 ? "test" + i : i);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
+ @Test
+ public void testNullableUnionType() throws Exception {
+ Schema schema = getSchema("test_nullable_union.avsc");
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ ArrayList<Object> expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ if (i % 3 == 0) {
+ record.put(0, "test" + i);
+ expected.add("test" + i);
+ data.add(record);
+ } else if (i % 3 == 1) {
+ record.put(0, i);
+ expected.add(i);
+ data.add(record);
+ } else {
+ record.put(0, null);
+ expected.add(null);
+ data.add(record);
+ }
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
private void checkPrimitiveResult(ArrayList data, FieldVector vector) {
assertEquals(data.size(), vector.getValueCount());
for (int i = 0; i < data.size(); i++) {
@@ -295,9 +341,9 @@ public class AvroToArrowTest {
assertTrue(value2 == null);
continue;
}
- if (vector.getField().getType().getTypeID() == ArrowType.Binary.TYPE_TYPE) {
+ if (value2 instanceof byte[]) {
value2 = ByteBuffer.wrap((byte[]) value2);
- } else if (vector.getField().getType().getTypeID() == ArrowType.Utf8.TYPE_TYPE) {
+ } else if (value2 instanceof Text) {
value2 = value2.toString();
}
assertTrue(Objects.equals(value1, value2));
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
similarity index 60%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
index 5784242..af94812 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
@@ -15,26 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * Interface that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- /**
- * Consume a specific type value from avro decoder and write it to vector.
- * @param decoder avro decoder to read data
- * @throws IOException on error
- */
- void consume(Decoder decoder) throws IOException;
-
- /**
- * Add null value to vector by making writer position + 1.
- */
- void addNull();
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testNullableUnions",
+ "fields": [
+ {"name": "f0", "type": ["string", "int", "null"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_union.avsc
similarity index 60%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_union.avsc
index 5784242..f181e36 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_union.avsc
@@ -15,26 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * Interface that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- /**
- * Consume a specific type value from avro decoder and write it to vector.
- * @param decoder avro decoder to read data
- * @throws IOException on error
- */
- void consume(Decoder decoder) throws IOException;
-
- /**
- * Add null value to vector by making writer position + 1.
- */
- void addNull();
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testUnions",
+ "fields": [
+ {"name": "f0", "type": ["string", "int"]}
+ ]
}
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index f288fb6..59cc91f 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -410,6 +410,18 @@ public class UnionVector implements FieldVector {
return newVector;
}
+ /**
+ * Directly put a vector to internalStruct without creating a new one with same type.
+ */
+ public void directAddVector(FieldVector v) {
+ String name = v.getMinorType().name().toLowerCase();
+ Preconditions.checkState(internalStruct.getChild(name) == null, String.format("%s vector already exists", name));
+ internalStruct.putChild(name, v);
+ if (callBack != null) {
+ callBack.doWork();
+ }
+ }
+
private class TransferImpl implements TransferPair {
private final TransferPair internalStructVectorTransferPair;
private final UnionVector to;