You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/01 03:28:10 UTC

[arrow] branch master updated: ARROW-6035: [Java] Avro adapter support convert nullable value

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ffb7f8  ARROW-6035: [Java] Avro adapter support convert nullable value
2ffb7f8 is described below

commit 2ffb7f825250b02a7aa65baa2271f74db0a31a48
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Wed Jul 31 20:27:33 2019 -0700

    ARROW-6035: [Java] Avro adapter support convert nullable value
    
    Related to [ARROW-6035](https://issues.apache.org/jira/browse/ARROW-6035).
    
    A  specific Avro unions type(has two types and one is null type) could convert to a nullable ArrowVector.
    For instance, ["null", "string"] could represented by a VarcharVector which could has null value.
    
    Closes #4943 from tianchen92/ARROW-6035 and squashes the following commits:
    
    8af1bfe05 <tianchen> rename
    be18c56b3 <tianchen> resolve comments
    0e7adda82 <tianchen> fix writer
    42979b74f <tianchen> fix
    2cc395001 <tianchen> fix nullable check
    92fb616c5 <tianchen> resolve comments
    ffaa138f4 <tianchen> ARROW-6035:  Avro adapter support convert nullable value
    
    Authored-by: tianchen <ni...@alibaba-inc.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 .../java/org/apache/arrow/AvroToArrowUtils.java    | 168 ++++++++++++++-------
 .../arrow/consumers/AvroBooleanConsumer.java       |   8 +
 .../apache/arrow/consumers/AvroBytesConsumer.java  |  15 +-
 .../apache/arrow/consumers/AvroDoubleConsumer.java |   8 +
 .../apache/arrow/consumers/AvroFloatConsumer.java  |   8 +
 .../apache/arrow/consumers/AvroIntConsumer.java    |   8 +
 .../apache/arrow/consumers/AvroLongConsumer.java   |   8 +
 .../apache/arrow/consumers/AvroStringConsumer.java |  14 +-
 .../java/org/apache/arrow/consumers/Consumer.java  |  12 +-
 ...leanConsumer.java => NullableTypeConsumer.java} |  34 +++--
 .../java/org/apache/arrow/AvroToArrowTest.java     | 133 ++++++++++++++--
 .../resources/schema/test_nullable_boolean.avsc}   |  19 +--
 .../resources/schema/test_nullable_bytes.avsc}     |  19 +--
 .../resources/schema/test_nullable_double.avsc}    |  19 +--
 .../resources/schema/test_nullable_float.avsc}     |  19 +--
 .../resources/schema/test_nullable_int.avsc}       |  19 +--
 .../resources/schema/test_nullable_long.avsc}      |  19 +--
 .../resources/schema/test_nullable_string.avsc}    |  19 +--
 18 files changed, 383 insertions(+), 166 deletions(-)

diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index 8ce4939..c5ec765 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -35,6 +35,7 @@ import org.apache.arrow.consumers.AvroIntConsumer;
 import org.apache.arrow.consumers.AvroLongConsumer;
 import org.apache.arrow.consumers.AvroStringConsumer;
 import org.apache.arrow.consumers.Consumer;
+import org.apache.arrow.consumers.NullableTypeConsumer;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.BaseFixedWidthVector;
 import org.apache.arrow.vector.BigIntVector;
@@ -43,6 +44,7 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
@@ -59,9 +61,10 @@ import org.apache.avro.io.Decoder;
 public class AvroToArrowUtils {
 
   private static final int DEFAULT_BUFFER_SIZE = 256;
+  public static final String NULL_INDEX = "nullIndex";
 
   /**
-   * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType} from the {@link Schema.Field}
+   * Creates a {@link Field} from the {@link Schema}
    *
    <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types.
    *
@@ -75,29 +78,70 @@ public class AvroToArrowUtils {
    *   <li>BYTES --> ArrowType.Binary</li>
    * </ul>
    */
-  private static ArrowType getArrowType(Type type) {
+  private static Field getArrowField(Schema schema, String name, boolean nullable) {
 
-    Preconditions.checkNotNull(type, "Avro type object can't be null");
+    Preconditions.checkNotNull(schema, "Avro schema object can't be null");
+
+    Type type = schema.getType();
+    ArrowType arrowType;
 
     switch (type) {
+      case UNION:
+        return getUnionField(schema, name);
       case STRING:
-        return new ArrowType.Utf8();
+        arrowType = new ArrowType.Utf8();
+        break;
       case INT:
-        return new ArrowType.Int(32, /*signed=*/true);
+        arrowType = new ArrowType.Int(32, /*signed=*/true);
+        break;
       case BOOLEAN:
-        return new ArrowType.Bool();
+        arrowType = new ArrowType.Bool();
+        break;
       case LONG:
-        return new ArrowType.Int(64, /*signed=*/true);
+        arrowType = new ArrowType.Int(64, /*signed=*/true);
+        break;
       case FLOAT:
-        return new ArrowType.FloatingPoint(SINGLE);
+        arrowType =  new ArrowType.FloatingPoint(SINGLE);
+        break;
       case DOUBLE:
-        return new ArrowType.FloatingPoint(DOUBLE);
+        arrowType = new ArrowType.FloatingPoint(DOUBLE);
+        break;
       case BYTES:
-        return new ArrowType.Binary();
+        arrowType = new ArrowType.Binary();
+        break;
       default:
         // no-op, shouldn't get here
         throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName());
     }
+
+    final FieldType fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null,
+        getMetaData(schema));
+    return new Field(name, fieldType, null);
+  }
+
+  private static Field getUnionField(Schema schema, String name) {
+    int size = schema.getTypes().size();
+    long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count();
+
+    // avro schema not allow repeated type, so size == nullCount + 1 indicates nullable type.
+    if (size == nullCount + 1) {
+
+      Schema nullSchema = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).findFirst().get();
+      String nullIndex = String.valueOf(schema.getTypes().indexOf(nullSchema));
+
+      // if has two field and one is null type, convert to nullable primitive type
+      if (size == 2) {
+        Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
+        Preconditions.checkNotNull(subSchema);
+        subSchema.addProp(NULL_INDEX, nullIndex);
+        return getArrowField(subSchema, name,true);
+      } else {
+        //TODO convert avro unions type to arrow UnionVector
+        throw new UnsupportedOperationException();
+      }
+    } else {
+      throw new UnsupportedOperationException();
+    }
   }
 
   private static Map<String, String> getMetaData(Schema schema) {
@@ -119,68 +163,70 @@ public class AvroToArrowUtils {
 
     if (type == Type.RECORD) {
       for (Schema.Field field : schema.getFields()) {
-        final ArrowType arrowType = getArrowType(field.schema().getType());
-        final FieldType fieldType = new FieldType(/*nullable=*/false, arrowType, /*dictionary=*/null,
-            /*metadata=*/getMetaData(field.schema()));
-        List<Field> children = null;
-        //TODO support complex type (i.e. nested records, lists, etc )
-        arrowFields.add(new Field(field.name(), fieldType, children));
+        arrowFields.add(getArrowField(field.schema(), field.name(),false));
       }
     } else if (type == Type.MAP) {
       throw new UnsupportedOperationException();
-    } else if (type == Type.UNION) {
-      throw new UnsupportedOperationException();
     } else if (type == Type.ARRAY) {
       throw new UnsupportedOperationException();
     } else if (type == Type.ENUM) {
       throw new UnsupportedOperationException();
-    } else if (type == Type.NULL) {
-      throw new UnsupportedOperationException();
     } else {
-      final FieldType fieldType = new FieldType(true, getArrowType(type), null, null);
-      arrowFields.add(new Field("", fieldType, null));
+      arrowFields.add(getArrowField(schema, "", false));
     }
 
     return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata);
   }
 
   /**
-   * Create consumers to consume avro values from decoder, will reduce boxing/unboxing operations.
+   * Create primitive consumer to read data from decoder, will reduce boxing/unboxing operations.
    */
-  public static Consumer[] createAvroConsumers(VectorSchemaRoot root) {
+  public static Consumer createPrimitiveConsumer(ValueVector vector) {
 
-    Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
-    for (int i = 0; i < root.getFieldVectors().size(); i++) {
-      FieldVector vector = root.getFieldVectors().get(i);
-      Consumer consumer;
-      switch (vector.getMinorType()) {
-        case INT:
-          consumer = new AvroIntConsumer((IntVector) vector);
-          break;
-        case VARBINARY:
-          consumer = new AvroBytesConsumer((VarBinaryVector) vector);
-          break;
-        case VARCHAR:
-          consumer = new AvroStringConsumer((VarCharVector) vector);
-          break;
-        case BIGINT:
-          consumer = new AvroLongConsumer((BigIntVector) vector);
-          break;
-        case FLOAT4:
-          consumer = new AvroFloatConsumer((Float4Vector) vector);
-          break;
-        case FLOAT8:
-          consumer = new AvroDoubleConsumer((Float8Vector) vector);
-          break;
-        case BIT:
-          consumer = new AvroBooleanConsumer((BitVector) vector);
-          break;
-        default:
-          throw new RuntimeException("could not get consumer from type:" + vector.getMinorType());
-      }
-      consumers[i] = consumer;
+    Consumer consumer;
+    switch (vector.getMinorType()) {
+      case INT:
+        consumer = new AvroIntConsumer((IntVector) vector);
+        break;
+      case VARBINARY:
+        consumer = new AvroBytesConsumer((VarBinaryVector) vector);
+        break;
+      case VARCHAR:
+        consumer = new AvroStringConsumer((VarCharVector) vector);
+        break;
+      case BIGINT:
+        consumer = new AvroLongConsumer((BigIntVector) vector);
+        break;
+      case FLOAT4:
+        consumer = new AvroFloatConsumer((Float4Vector) vector);
+        break;
+      case FLOAT8:
+        consumer = new AvroDoubleConsumer((Float8Vector) vector);
+        break;
+      case BIT:
+        consumer = new AvroBooleanConsumer((BitVector) vector);
+        break;
+      default:
+        throw new RuntimeException("could not get consumer from type:" + vector.getMinorType());
+    }
+
+    if (vector.getField().isNullable()) {
+      int nullIndex = getNullFieldIndex(vector.getField());
+      return new NullableTypeConsumer(consumer, nullIndex);
     }
-    return consumers;
+
+    return consumer;
+  }
+
+  /**
+   * Get avro null field index from vector field metadata.
+   */
+  private static int getNullFieldIndex(Field field) {
+    Map<String, String> metadata = field.getMetadata();
+    Preconditions.checkNotNull(metadata, "metadata should not be null when vector is nullable");
+    String index = metadata.get(AvroToArrowUtils.NULL_INDEX);
+    Preconditions.checkNotNull(index, "nullIndex should not be null when vector is nullable");
+    return Integer.parseInt(index);
   }
 
   /**
@@ -195,14 +241,24 @@ public class AvroToArrowUtils {
     Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
 
     allocateVectors(root, DEFAULT_BUFFER_SIZE);
-    Consumer[] consumers = createAvroConsumers(root);
+
+    // create consumers
+    Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
+    for (int i = 0; i < root.getFieldVectors().size(); i++) {
+      FieldVector vector = root.getFieldVectors().get(i);
+      consumers[i] = createPrimitiveConsumer(vector);
+    }
+
+    int valueCount = 0;
     while (true) {
       try {
         for (Consumer consumer : consumers) {
           consumer.consume(decoder);
         }
+        valueCount++;
         //reach end will throw EOFException.
       } catch (EOFException eofException) {
+        root.setRowCount(valueCount);
         break;
       }
     }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index 7bbfac1..134cc5c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -32,6 +32,9 @@ public class AvroBooleanConsumer implements Consumer {
 
   private final BitWriter writer;
 
+  /**
+   * Instantiate a AvroBooleanConsumer.
+   */
   public AvroBooleanConsumer(BitVector vector) {
     this.writer = new BitWriterImpl(vector);
   }
@@ -41,4 +44,9 @@ public class AvroBooleanConsumer implements Consumer {
     writer.writeBit(decoder.readBoolean() ? 1 : 0);
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index b034ea6..0424641 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -36,6 +36,9 @@ public class AvroBytesConsumer implements Consumer {
   private final VarBinaryVector vector;
   private ByteBuffer cacheBuffer;
 
+  /**
+   * Instantiate a AvroBytesConsumer.
+   */
   public AvroBytesConsumer(VarBinaryVector vector) {
     this.vector = vector;
     this.writer = new VarBinaryWriterImpl(vector);
@@ -43,6 +46,16 @@ public class AvroBytesConsumer implements Consumer {
 
   @Override
   public void consume(Decoder decoder) throws IOException {
+    writeValue(decoder);
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  private void writeValue(Decoder decoder) throws IOException {
     VarBinaryHolder holder = new VarBinaryHolder();
 
     // cacheBuffer is initialized null and create in the first consume,
@@ -53,8 +66,6 @@ public class AvroBytesConsumer implements Consumer {
     holder.end = cacheBuffer.limit();
     holder.buffer = vector.getAllocator().buffer(cacheBuffer.limit());
     holder.buffer.setBytes(0, cacheBuffer, 0,  cacheBuffer.limit());
-
     writer.write(holder);
-    writer.setPosition(writer.getPosition() + 1);
   }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 62dc315..9a60c24 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -32,6 +32,9 @@ public class AvroDoubleConsumer implements Consumer {
 
   private final Float8Writer writer;
 
+  /**
+   * Instantiate a AvroDoubleConsumer.
+   */
   public AvroDoubleConsumer(Float8Vector vector) {
     this.writer = new Float8WriterImpl(vector);
   }
@@ -41,4 +44,9 @@ public class AvroDoubleConsumer implements Consumer {
     writer.writeFloat8(decoder.readDouble());
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index 2bec2b2..8bfe85f 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -32,6 +32,9 @@ public class AvroFloatConsumer implements Consumer {
 
   private final Float4Writer writer;
 
+  /**
+   * Instantiate a AvroFloatConsumer.
+   */
   public AvroFloatConsumer(Float4Vector vector) {
     this.writer = new Float4WriterImpl(vector);
   }
@@ -41,4 +44,9 @@ public class AvroFloatConsumer implements Consumer {
     writer.writeFloat4(decoder.readFloat());
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index 60285f0..ce117e7 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -32,6 +32,9 @@ public class AvroIntConsumer implements Consumer {
 
   private final IntWriter writer;
 
+  /**
+   * Instantiate a AvroIntConsumer.
+   */
   public AvroIntConsumer(IntVector vector) {
     this.writer = new IntWriterImpl(vector);
   }
@@ -41,4 +44,9 @@ public class AvroIntConsumer implements Consumer {
     writer.writeInt(decoder.readInt());
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 15756af..42e3666 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -32,6 +32,9 @@ public class AvroLongConsumer implements Consumer {
 
   private final BigIntWriter writer;
 
+  /**
+   * Instantiate a AvroLongConsumer.
+   */
   public AvroLongConsumer(BigIntVector vector) {
     this.writer = new BigIntWriterImpl(vector);
   }
@@ -41,4 +44,9 @@ public class AvroLongConsumer implements Consumer {
     writer.writeBigInt(decoder.readLong());
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index fa8b3f5..7d6d495 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -36,6 +36,9 @@ public class AvroStringConsumer implements Consumer {
   private final VarCharWriter writer;
   private ByteBuffer cacheBuffer;
 
+  /**
+   * Instantiate a AvroStringConsumer.
+   */
   public AvroStringConsumer(VarCharVector vector) {
     this.vector = vector;
     this.writer = new VarCharWriterImpl(vector);
@@ -43,6 +46,16 @@ public class AvroStringConsumer implements Consumer {
 
   @Override
   public void consume(Decoder decoder) throws IOException {
+    writeValue(decoder);
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  private void writeValue(Decoder decoder) throws IOException {
     VarCharHolder holder = new VarCharHolder();
 
     // cacheBuffer is initialized null and create in the first consume,
@@ -55,6 +68,5 @@ public class AvroStringConsumer implements Consumer {
     holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit());
 
     writer.write(holder);
-    writer.setPosition(writer.getPosition() + 1);
   }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index b3c5281..5784242 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -22,9 +22,19 @@ import java.io.IOException;
 import org.apache.avro.io.Decoder;
 
 /**
- * An abstraction that is used to consume values from avro decoder.
+ * Interface that is used to consume values from avro decoder.
  */
 public interface Consumer {
 
+  /**
+   * Consume a specific type value from avro decoder and write it to vector.
+   * @param decoder avro decoder to read data
+   * @throws IOException on error
+   */
   void consume(Decoder decoder) throws IOException;
+
+  /**
+   * Add null value to vector by making writer position + 1.
+   */
+  void addNull();
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
similarity index 62%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
index 7bbfac1..31fe3b8 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
@@ -19,26 +19,38 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.complex.impl.BitWriterImpl;
-import org.apache.arrow.vector.complex.writer.BitWriter;
 import org.apache.avro.io.Decoder;
 
 /**
- * Consumer which consume boolean type values from avro decoder.
- * Write the data to {@link BitVector}.
+ * Consumer holds a primitive consumer, could consume nullable values from avro decoder.
+ * Write data via writer of delegate consumer.
  */
-public class AvroBooleanConsumer implements Consumer {
+public class NullableTypeConsumer implements Consumer {
 
-  private final BitWriter writer;
 
-  public AvroBooleanConsumer(BitVector vector) {
-    this.writer = new BitWriterImpl(vector);
+  private final Consumer delegate;
+
+  /**
+   * Null field index in avro schema.
+   */
+  protected int nullIndex;
+
+  public NullableTypeConsumer(Consumer delegate, int nullIndex) {
+    this.delegate = delegate;
+    this.nullIndex = nullIndex;
   }
 
   @Override
   public void consume(Decoder decoder) throws IOException {
-    writer.writeBit(decoder.readBoolean() ? 1 : 0);
-    writer.setPosition(writer.getPosition() + 1);
+    if (nullIndex != decoder.readInt()) {
+      delegate.consume(decoder);
+    } else {
+      addNull();
+    }
+  }
+
+  @Override
+  public void addNull() {
+    delegate.addNull();
   }
 }
diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
index 335ccfb..cce9ea9 100644
--- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
+++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -36,6 +36,7 @@ import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -90,7 +91,22 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
+  }
+
+  @Test
+  public void testNullableStringType() throws Exception {
+    Schema schema = getSchema("test_nullable_string.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? "test" + i : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
   }
 
   @Test
@@ -102,6 +118,7 @@ public class AvroToArrowTest {
       record.put(0, "test" + i);
       record.put(1, i);
       record.put(2, i % 2 == 0);
+      data.add(record);
     }
 
     VectorSchemaRoot root = writeAndRead(schema, data);
@@ -116,7 +133,22 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
+  }
+
+  @Test
+  public void testNullableIntType() throws Exception {
+    Schema schema = getSchema("test_nullable_int.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? i : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
   }
 
   @Test
@@ -127,7 +159,22 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
+  }
+
+  @Test
+  public void testNullableLongType() throws Exception {
+    Schema schema = getSchema("test_nullable_long.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? (long) i : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
   }
 
   @Test
@@ -138,7 +185,22 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
+  }
+
+  @Test
+  public void testNullableFloatType() throws Exception {
+    Schema schema = getSchema("test_nullable_float.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? i + 0.1f : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
   }
 
   @Test
@@ -149,7 +211,22 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
+  }
+
+  @Test
+  public void testNullableDoubleType() throws Exception {
+    Schema schema = getSchema("test_nullable_double.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? i + 0.1 : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
   }
 
   @Test
@@ -165,7 +242,22 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
+  }
+
+  @Test
+  public void testNullableBytesType() throws Exception {
+    Schema schema = getSchema("test_nullable_bytes.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? ByteBuffer.wrap(("test" + i).getBytes(StandardCharsets.UTF_8)) : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
   }
 
   @Test
@@ -176,17 +268,36 @@ public class AvroToArrowTest {
     VectorSchemaRoot root = writeAndRead(schema, data);
     FieldVector vector = root.getFieldVectors().get(0);
 
-    checkPrimitiveResult(schema, data, vector);
+    checkPrimitiveResult(data, vector);
   }
 
-  private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector vector) {
+  @Test
+  public void testNullableBooleanType() throws Exception {
+    Schema schema = getSchema("test_nullable_boolean.avsc");
+
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? true : null);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    checkRecordResult(schema, data, root);
+  }
+
+  private void checkPrimitiveResult(ArrayList data, FieldVector vector) {
     assertEquals(data.size(), vector.getValueCount());
     for (int i = 0; i < data.size(); i++) {
       Object value1 = data.get(i);
       Object value2 = vector.getObject(i);
-      if (schema.getType() == Schema.Type.BYTES) {
+      if (value1 == null) {
+        assertTrue(value2 == null);
+        continue;
+      }
+      if (vector.getField().getType().getTypeID() == ArrowType.Binary.TYPE_TYPE) {
         value2 = ByteBuffer.wrap((byte[]) value2);
-      } else if (schema.getType() == Schema.Type.STRING) {
+      } else if (vector.getField().getType().getTypeID() == ArrowType.Utf8.TYPE_TYPE) {
         value2 = value2.toString();
       }
       assertTrue(Objects.equals(value1, value2));
@@ -203,7 +314,7 @@ public class AvroToArrowTest {
         fieldData.add(record.get(i));
       }
 
-      checkPrimitiveResult(schema.getFields().get(i).schema(), fieldData, root.getFieldVectors().get(i));
+      checkPrimitiveResult(fieldData, root.getFieldVectors().get(i));
     }
 
   }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
index b3c5281..62af1a8 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_boolean.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableBoolean",
+ "fields": [
+     {"name": "f0", "type": ["null", "boolean"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
index b3c5281..002bc7c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_bytes.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableBytes",
+ "fields": [
+     {"name": "f0", "type": ["null", "bytes"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
index b3c5281..642b7aa 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_double.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableDouble",
+ "fields": [
+     {"name": "f0", "type": ["null", "double"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
index b3c5281..dff2859 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_float.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableFloat",
+ "fields": [
+     {"name": "f0", "type": ["null", "float"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
index b3c5281..abb2fc4 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_int.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableInt",
+ "fields": [
+     {"name": "f0", "type": ["null", "int"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
index b3c5281..0624d27 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_long.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableLong",
+ "fields": [
+     {"name": "f0", "type": ["null", "long"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
similarity index 75%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
index b3c5281..347808c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_string.avsc
@@ -15,16 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * An abstraction that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  void consume(Decoder decoder) throws IOException;
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "nullableString",
+ "fields": [
+     {"name": "f0", "type": ["null", "string"]}
+ ]
 }