You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/01 03:28:10 UTC
[arrow] branch master updated: ARROW-6035: [Java] Avro adapter
support convert nullable value
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2ffb7f8 ARROW-6035: [Java] Avro adapter support convert nullable value
2ffb7f8 is described below
commit 2ffb7f825250b02a7aa65baa2271f74db0a31a48
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Wed Jul 31 20:27:33 2019 -0700
ARROW-6035: [Java] Avro adapter support convert nullable value
Related to [ARROW-6035](https://issues.apache.org/jira/browse/ARROW-6035).
A specific Avro unions type(has two types and one is null type) could convert to a nullable ArrowVector.
For instance, ["null", "string"] could represented by a VarcharVector which could has null value.
Closes #4943 from tianchen92/ARROW-6035 and squashes the following commits:
8af1bfe05 <tianchen> rename
be18c56b3 <tianchen> resolve comments
0e7adda82 <tianchen> fix writer
42979b74f <tianchen> fix
2cc395001 <tianchen> fix nullable check
92fb616c5 <tianchen> resolve comments
ffaa138f4 <tianchen> ARROW-6035: Avro adapter support convert nullable value
Authored-by: tianchen <ni...@alibaba-inc.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../java/org/apache/arrow/AvroToArrowUtils.java | 168 ++++++++++++++-------
.../arrow/consumers/AvroBooleanConsumer.java | 8 +
.../apache/arrow/consumers/AvroBytesConsumer.java | 15 +-
.../apache/arrow/consumers/AvroDoubleConsumer.java | 8 +
.../apache/arrow/consumers/AvroFloatConsumer.java | 8 +
.../apache/arrow/consumers/AvroIntConsumer.java | 8 +
.../apache/arrow/consumers/AvroLongConsumer.java | 8 +
.../apache/arrow/consumers/AvroStringConsumer.java | 14 +-
.../java/org/apache/arrow/consumers/Consumer.java | 12 +-
...leanConsumer.java => NullableTypeConsumer.java} | 34 +++--
.../java/org/apache/arrow/AvroToArrowTest.java | 133 ++++++++++++++--
.../resources/schema/test_nullable_boolean.avsc} | 19 +--
.../resources/schema/test_nullable_bytes.avsc} | 19 +--
.../resources/schema/test_nullable_double.avsc} | 19 +--
.../resources/schema/test_nullable_float.avsc} | 19 +--
.../resources/schema/test_nullable_int.avsc} | 19 +--
.../resources/schema/test_nullable_long.avsc} | 19 +--
.../resources/schema/test_nullable_string.avsc} | 19 +--
18 files changed, 383 insertions(+), 166 deletions(-)
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index 8ce4939..c5ec765 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -35,6 +35,7 @@ import org.apache.arrow.consumers.AvroIntConsumer;
import org.apache.arrow.consumers.AvroLongConsumer;
import org.apache.arrow.consumers.AvroStringConsumer;
import org.apache.arrow.consumers.Consumer;
+import org.apache.arrow.consumers.NullableTypeConsumer;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
@@ -43,6 +44,7 @@ import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -59,9 +61,10 @@ import org.apache.avro.io.Decoder;
public class AvroToArrowUtils {
private static final int DEFAULT_BUFFER_SIZE = 256;
+ public static final String NULL_INDEX = "nullIndex";
/**
- * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType} from the {@link Schema.Field}
+ * Creates a {@link Field} from the {@link Schema}
*
<p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types.
*
@@ -75,29 +78,70 @@ public class AvroToArrowUtils {
* <li>BYTES --> ArrowType.Binary</li>
* </ul>
*/
- private static ArrowType getArrowType(Type type) {
+ private static Field getArrowField(Schema schema, String name, boolean nullable) {
- Preconditions.checkNotNull(type, "Avro type object can't be null");
+ Preconditions.checkNotNull(schema, "Avro schema object can't be null");
+
+ Type type = schema.getType();
+ ArrowType arrowType;
switch (type) {
+ case UNION:
+ return getUnionField(schema, name);
case STRING:
- return new ArrowType.Utf8();
+ arrowType = new ArrowType.Utf8();
+ break;
case INT:
- return new ArrowType.Int(32, /*signed=*/true);
+ arrowType = new ArrowType.Int(32, /*signed=*/true);
+ break;
case BOOLEAN:
- return new ArrowType.Bool();
+ arrowType = new ArrowType.Bool();
+ break;
case LONG:
- return new ArrowType.Int(64, /*signed=*/true);
+ arrowType = new ArrowType.Int(64, /*signed=*/true);
+ break;
case FLOAT:
- return new ArrowType.FloatingPoint(SINGLE);
+ arrowType = new ArrowType.FloatingPoint(SINGLE);
+ break;
case DOUBLE:
- return new ArrowType.FloatingPoint(DOUBLE);
+ arrowType = new ArrowType.FloatingPoint(DOUBLE);
+ break;
case BYTES:
- return new ArrowType.Binary();
+ arrowType = new ArrowType.Binary();
+ break;
default:
// no-op, shouldn't get here
throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName());
}
+
+ final FieldType fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null,
+ getMetaData(schema));
+ return new Field(name, fieldType, null);
+ }
+
+ private static Field getUnionField(Schema schema, String name) {
+ int size = schema.getTypes().size();
+ long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count();
+
+ // avro schema not allow repeated type, so size == nullCount + 1 indicates nullable type.
+ if (size == nullCount + 1) {
+
+ Schema nullSchema = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).findFirst().get();
+ String nullIndex = String.valueOf(schema.getTypes().indexOf(nullSchema));
+
+ // if has two field and one is null type, convert to nullable primitive type
+ if (size == 2) {
+ Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
+ Preconditions.checkNotNull(subSchema);
+ subSchema.addProp(NULL_INDEX, nullIndex);
+ return getArrowField(subSchema, name,true);
+ } else {
+ //TODO convert avro unions type to arrow UnionVector
+ throw new UnsupportedOperationException();
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
}
private static Map<String, String> getMetaData(Schema schema) {
@@ -119,68 +163,70 @@ public class AvroToArrowUtils {
if (type == Type.RECORD) {
for (Schema.Field field : schema.getFields()) {
- final ArrowType arrowType = getArrowType(field.schema().getType());
- final FieldType fieldType = new FieldType(/*nullable=*/false, arrowType, /*dictionary=*/null,
- /*metadata=*/getMetaData(field.schema()));
- List<Field> children = null;
- //TODO support complex type (i.e. nested records, lists, etc )
- arrowFields.add(new Field(field.name(), fieldType, children));
+ arrowFields.add(getArrowField(field.schema(), field.name(),false));
}
} else if (type == Type.MAP) {
throw new UnsupportedOperationException();
- } else if (type == Type.UNION) {
- throw new UnsupportedOperationException();
} else if (type == Type.ARRAY) {
throw new UnsupportedOperationException();
} else if (type == Type.ENUM) {
throw new UnsupportedOperationException();
- } else if (type == Type.NULL) {
- throw new UnsupportedOperationException();
} else {
- final FieldType fieldType = new FieldType(true, getArrowType(type), null, null);
- arrowFields.add(new Field("", fieldType, null));
+ arrowFields.add(getArrowField(schema, "", false));
}
return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata);
}
/**
- * Create consumers to consume avro values from decoder, will reduce boxing/unboxing operations.
+ * Create primitive consumer to read data from decoder, will reduce boxing/unboxing operations.
*/
- public static Consumer[] createAvroConsumers(VectorSchemaRoot root) {
+ public static Consumer createPrimitiveConsumer(ValueVector vector) {
- Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
- for (int i = 0; i < root.getFieldVectors().size(); i++) {
- FieldVector vector = root.getFieldVectors().get(i);
- Consumer consumer;
- switch (vector.getMinorType()) {
- case INT:
- consumer = new AvroIntConsumer((IntVector) vector);
- break;
- case VARBINARY:
- consumer = new AvroBytesConsumer((VarBinaryVector) vector);
- break;
- case VARCHAR:
- consumer = new AvroStringConsumer((VarCharVector) vector);
- break;
- case BIGINT:
- consumer = new AvroLongConsumer((BigIntVector) vector);
- break;
- case FLOAT4:
- consumer = new AvroFloatConsumer((Float4Vector) vector);
- break;
- case FLOAT8:
- consumer = new AvroDoubleConsumer((Float8Vector) vector);
- break;
- case BIT:
- consumer = new AvroBooleanConsumer((BitVector) vector);
- break;
- default:
- throw new RuntimeException("could not get consumer from type:" + vector.getMinorType());
- }
- consumers[i] = consumer;
+ Consumer consumer;
+ switch (vector.getMinorType()) {
+ case INT:
+ consumer = new AvroIntConsumer((IntVector) vector);
+ break;
+ case VARBINARY:
+ consumer = new AvroBytesConsumer((VarBinaryVector) vector);
+ break;
+ case VARCHAR:
+ consumer = new AvroStringConsumer((VarCharVector) vector);
+ break;
+ case BIGINT:
+ consumer = new AvroLongConsumer((BigIntVector) vector);
+ break;
+ case FLOAT4:
+ consumer = new AvroFloatConsumer((Float4Vector) vector);
+ break;
+ case FLOAT8:
+ consumer = new AvroDoubleConsumer((Float8Vector) vector);
+ break;
+ case BIT:
+ consumer = new AvroBooleanConsumer((BitVector) vector);
+ break;
+ default:
+ throw new RuntimeException("could not get consumer from type:" + vector.getMinorType());
+ }
+
+ if (vector.getField().isNullable()) {
+ int nullIndex = getNullFieldIndex(vector.getField());
+ return new NullableTypeConsumer(consumer, nullIndex);
}
- return consumers;
+
+ return consumer;
+ }
+
+ /**
+ * Get avro null field index from vector field metadata.
+ */
+ private static int getNullFieldIndex(Field field) {
+ Map<String, String> metadata = field.getMetadata();
+ Preconditions.checkNotNull(metadata, "metadata should not be null when vector is nullable");
+ String index = metadata.get(AvroToArrowUtils.NULL_INDEX);
+ Preconditions.checkNotNull(index, "nullIndex should not be null when vector is nullable");
+ return Integer.parseInt(index);
}
/**
@@ -195,14 +241,24 @@ public class AvroToArrowUtils {
Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
allocateVectors(root, DEFAULT_BUFFER_SIZE);
- Consumer[] consumers = createAvroConsumers(root);
+
+ // create consumers
+ Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
+ for (int i = 0; i < root.getFieldVectors().size(); i++) {
+ FieldVector vector = root.getFieldVectors().get(i);
+ consumers[i] = createPrimitiveConsumer(vector);
+ }
+
+ int valueCount = 0;
while (true) {
try {
for (Consumer consumer : consumers) {
consumer.consume(decoder);
}
+ valueCount++;
//reach end will throw EOFException.
} catch (EOFException eofException) {
+ root.setRowCount(valueCount);
break;
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index 7bbfac1..134cc5c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -32,6 +32,9 @@ public class AvroBooleanConsumer implements Consumer {
private final BitWriter writer;
+ /**
+ * Instantiate a AvroBooleanConsumer.
+ */
public AvroBooleanConsumer(BitVector vector) {
this.writer = new BitWriterImpl(vector);
}
@@ -41,4 +44,9 @@ public class AvroBooleanConsumer implements Consumer {
writer.writeBit(decoder.readBoolean() ? 1 : 0);
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index b034ea6..0424641 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -36,6 +36,9 @@ public class AvroBytesConsumer implements Consumer {
private final VarBinaryVector vector;
private ByteBuffer cacheBuffer;
+ /**
+ * Instantiate a AvroBytesConsumer.
+ */
public AvroBytesConsumer(VarBinaryVector vector) {
this.vector = vector;
this.writer = new VarBinaryWriterImpl(vector);
@@ -43,6 +46,16 @@ public class AvroBytesConsumer implements Consumer {
@Override
public void consume(Decoder decoder) throws IOException {
+ writeValue(decoder);
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ private void writeValue(Decoder decoder) throws IOException {
VarBinaryHolder holder = new VarBinaryHolder();
// cacheBuffer is initialized null and create in the first consume,
@@ -53,8 +66,6 @@ public class AvroBytesConsumer implements Consumer {
holder.end = cacheBuffer.limit();
holder.buffer = vector.getAllocator().buffer(cacheBuffer.limit());
holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit());
-
writer.write(holder);
- writer.setPosition(writer.getPosition() + 1);
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 62dc315..9a60c24 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -32,6 +32,9 @@ public class AvroDoubleConsumer implements Consumer {
private final Float8Writer writer;
+ /**
+ * Instantiate a AvroDoubleConsumer.
+ */
public AvroDoubleConsumer(Float8Vector vector) {
this.writer = new Float8WriterImpl(vector);
}
@@ -41,4 +44,9 @@ public class AvroDoubleConsumer implements Consumer {
writer.writeFloat8(decoder.readDouble());
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index 2bec2b2..8bfe85f 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -32,6 +32,9 @@ public class AvroFloatConsumer implements Consumer {
private final Float4Writer writer;
+ /**
+ * Instantiate a AvroFloatConsumer.
+ */
public AvroFloatConsumer(Float4Vector vector) {
this.writer = new Float4WriterImpl(vector);
}
@@ -41,4 +44,9 @@ public class AvroFloatConsumer implements Consumer {
writer.writeFloat4(decoder.readFloat());
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index 60285f0..ce117e7 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -32,6 +32,9 @@ public class AvroIntConsumer implements Consumer {
private final IntWriter writer;
+ /**
+ * Instantiate a AvroIntConsumer.
+ */
public AvroIntConsumer(IntVector vector) {
this.writer = new IntWriterImpl(vector);
}
@@ -41,4 +44,9 @@ public class AvroIntConsumer implements Consumer {
writer.writeInt(decoder.readInt());
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 15756af..42e3666 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -32,6 +32,9 @@ public class AvroLongConsumer implements Consumer {
private final BigIntWriter writer;
+ /**
+ * Instantiate a AvroLongConsumer.
+ */
public AvroLongConsumer(BigIntVector vector) {
this.writer = new BigIntWriterImpl(vector);
}
@@ -41,4 +44,9 @@ public class AvroLongConsumer implements Consumer {
writer.writeBigInt(decoder.readLong());
writer.setPosition(writer.getPosition() + 1);
}
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index fa8b3f5..7d6d495 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -36,6 +36,9 @@ public class AvroStringConsumer implements Consumer {
private final VarCharWriter writer;
private ByteBuffer cacheBuffer;
+ /**
+ * Instantiate a AvroStringConsumer.
+ */
public AvroStringConsumer(VarCharVector vector) {
this.vector = vector;
this.writer = new VarCharWriterImpl(vector);
@@ -43,6 +46,16 @@ public class AvroStringConsumer implements Consumer {
@Override
public void consume(Decoder decoder) throws IOException {
+ writeValue(decoder);
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ @Override
+ public void addNull() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ private void writeValue(Decoder decoder) throws IOException {
VarCharHolder holder = new VarCharHolder();
// cacheBuffer is initialized null and create in the first consume,
@@ -55,6 +68,5 @@ public class AvroStringConsumer implements Consumer {
holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit());
writer.write(holder);
- writer.setPosition(writer.getPosition() + 1);
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index b3c5281..5784242 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -22,9 +22,19 @@ import java.io.IOException;
import org.apache.avro.io.Decoder;
/**
- * An abstraction that is used to consume values from avro decoder.
+ * Interface that is used to consume values from avro decoder.
*/
public interface Consumer {
+ /**
+ * Consume a specific type value from avro decoder and write it to vector.
+ * @param decoder avro decoder to read data
+ * @throws IOException on error
+ */
void consume(Decoder decoder) throws IOException;
+
+ /**
+ * Add null value to vector by making writer position + 1.
+ */
+ void addNull();
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
similarity index 62%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
index 7bbfac1..31fe3b8 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
@@ -19,26 +19,38 @@ package org.apache.arrow.consumers;
import java.io.IOException;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.complex.impl.BitWriterImpl;
-import org.apache.arrow.vector.complex.writer.BitWriter;
import org.apache.avro.io.Decoder;
/**
- * Consumer which consume boolean type values from avro decoder.
- * Write the data to {@link BitVector}.
+ * Consumer holds a primitive consumer, could consume nullable values from avro decoder.
+ * Write data via writer of delegate consumer.
*/
-public class AvroBooleanConsumer implements Consumer {
+public class NullableTypeConsumer implements Consumer {
- private final BitWriter writer;
- public AvroBooleanConsumer(BitVector vector) {
- this.writer = new BitWriterImpl(vector);
+ private final Consumer delegate;
+
+ /**
+ * Null field index in avro schema.
+ */
+ protected int nullIndex;
+
+ public NullableTypeConsumer(Consumer delegate, int nullIndex) {
+ this.delegate = delegate;
+ this.nullIndex = nullIndex;
}
@Override
public void consume(Decoder decoder) throws IOException {
- writer.writeBit(decoder.readBoolean() ? 1 : 0);
- writer.setPosition(writer.getPosition() + 1);
+ if (nullIndex != decoder.readInt()) {
+ delegate.consume(decoder);
+ } else {
+ addNull();
+ }
+ }
+
+ @Override
+ public void addNull() {
+ delegate.addNull();
}
}
diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
index 335ccfb..cce9ea9 100644
--- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
+++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -36,6 +36,7 @@ import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -90,7 +91,22 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableStringType() throws Exception {
+ Schema schema = getSchema("test_nullable_string.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? "test" + i : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
}
@Test
@@ -102,6 +118,7 @@ public class AvroToArrowTest {
record.put(0, "test" + i);
record.put(1, i);
record.put(2, i % 2 == 0);
+ data.add(record);
}
VectorSchemaRoot root = writeAndRead(schema, data);
@@ -116,7 +133,22 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableIntType() throws Exception {
+ Schema schema = getSchema("test_nullable_int.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? i : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
}
@Test
@@ -127,7 +159,22 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableLongType() throws Exception {
+ Schema schema = getSchema("test_nullable_long.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? (long) i : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
}
@Test
@@ -138,7 +185,22 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableFloatType() throws Exception {
+ Schema schema = getSchema("test_nullable_float.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? i + 0.1f : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
}
@Test
@@ -149,7 +211,22 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableDoubleType() throws Exception {
+ Schema schema = getSchema("test_nullable_double.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? i + 0.1 : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
}
@Test
@@ -165,7 +242,22 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
+ }
+
+ @Test
+ public void testNullableBytesType() throws Exception {
+ Schema schema = getSchema("test_nullable_bytes.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? ByteBuffer.wrap(("test" + i).getBytes(StandardCharsets.UTF_8)) : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
}
@Test
@@ -176,17 +268,36 @@ public class AvroToArrowTest {
VectorSchemaRoot root = writeAndRead(schema, data);
FieldVector vector = root.getFieldVectors().get(0);
- checkPrimitiveResult(schema, data, vector);
+ checkPrimitiveResult(data, vector);
}
- private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector vector) {
+ @Test
+ public void testNullableBooleanType() throws Exception {
+ Schema schema = getSchema("test_nullable_boolean.avsc");
+
+ ArrayList<GenericRecord> data = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put(0, i % 2 == 0 ? true : null);
+ data.add(record);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ checkRecordResult(schema, data, root);
+ }
+
+ private void checkPrimitiveResult(ArrayList data, FieldVector vector) {
assertEquals(data.size(), vector.getValueCount());
for (int i = 0; i < data.size(); i++) {
Object value1 = data.get(i);
Object value2 = vector.getObject(i);
- if (schema.getType() == Schema.Type.BYTES) {
+ if (value1 == null) {
+ assertTrue(value2 == null);
+ continue;
+ }
+ if (vector.getField().getType().getTypeID() == ArrowType.Binary.TYPE_TYPE) {
value2 = ByteBuffer.wrap((byte[]) value2);
- } else if (schema.getType() == Schema.Type.STRING) {
+ } else if (vector.getField().getType().getTypeID() == ArrowType.Utf8.TYPE_TYPE) {
value2 = value2.toString();
}
assertTrue(Objects.equals(value1, value2));
@@ -203,7 +314,7 @@ public class AvroToArrowTest {
fieldData.add(record.get(i));
}
- checkPrimitiveResult(schema.getFields().get(i).schema(), fieldData, root.getFieldVectors().get(i));
+ checkPrimitiveResult(fieldData, root.getFieldVectors().get(i));
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
index b3c5281..62af1a8 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableBoolean",
+ "fields": [
+ {"name": "f0", "type": ["null", "boolean"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
index b3c5281..002bc7c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableBytes",
+ "fields": [
+ {"name": "f0", "type": ["null", "bytes"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
index b3c5281..642b7aa 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableDouble",
+ "fields": [
+ {"name": "f0", "type": ["null", "double"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
index b3c5281..dff2859 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableFloat",
+ "fields": [
+ {"name": "f0", "type": ["null", "float"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
index b3c5281..abb2fc4 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableInt",
+ "fields": [
+ {"name": "f0", "type": ["null", "int"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
index b3c5281..0624d27 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableLong",
+ "fields": [
+ {"name": "f0", "type": ["null", "long"]}
+ ]
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
index b3c5281..347808c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
@@ -15,16 +15,11 @@
* limitations under the License.
*/
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
- void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableString",
+ "fields": [
+ {"name": "f0", "type": ["null", "string"]}
+ ]
}