You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/11 03:56:28 UTC

[arrow] branch master updated: ARROW-6097: [Java] Avro adapter implement unions type

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 34dd3ed  ARROW-6097: [Java] Avro adapter implement unions type
34dd3ed is described below

commit 34dd3edece64c2268047db626fbfa967e36370a4
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Sat Aug 10 19:58:05 2019 -0700

    ARROW-6097: [Java] Avro adapter implement unions type
    
    Related to [ARROW-6097](https://issues.apache.org/jira/browse/ARROW-6097).
    Support convert unions type like ["string"], ["string", 'int"] and nullable ["string", "int", "null"]
    
    Closes #4984 from tianchen92/ARROW-6097 and squashes the following commits:
    
    0cdca698b <tianchen> fix documents
    3cc028fbe <tianchen> add constant INVALID_NULL_INDEX
    b4a24637b <tianchen> add comment
    656408847 <tianchen> refactor
    b4d0fbe3e <tianchen> resolve comments
    15e808c80 <tianchen> ARROW-6097:  Avro adapter implement unions type
    
    Authored-by: tianchen <ni...@alibaba-inc.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 .../main/java/org/apache/arrow/AvroToArrow.java    |   5 +-
 .../java/org/apache/arrow/AvroToArrowUtils.java    | 220 ++++++++++-----------
 .../arrow/consumers/AvroBooleanConsumer.java       |  14 ++
 .../apache/arrow/consumers/AvroBytesConsumer.java  |  11 ++
 .../apache/arrow/consumers/AvroDoubleConsumer.java |  13 ++
 .../apache/arrow/consumers/AvroFloatConsumer.java  |  13 ++
 .../apache/arrow/consumers/AvroIntConsumer.java    |  13 ++
 .../apache/arrow/consumers/AvroLongConsumer.java   |  13 ++
 .../{Consumer.java => AvroNullConsumer.java}       |  39 ++--
 .../apache/arrow/consumers/AvroStringConsumer.java |  11 ++
 .../apache/arrow/consumers/AvroUnionsConsumer.java |  83 ++++++++
 .../java/org/apache/arrow/consumers/Consumer.java  |  11 ++
 .../arrow/consumers/NullableTypeConsumer.java      |  12 ++
 .../java/org/apache/arrow/AvroToArrowTest.java     |  52 ++++-
 .../resources/schema/test_nullable_union.avsc}     |  29 +--
 .../resources/schema/test_union.avsc}              |  29 +--
 .../src/main/codegen/templates/UnionVector.java    |  12 ++
 17 files changed, 398 insertions(+), 182 deletions(-)

diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
index 4801d69..63199fc 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java
@@ -41,9 +41,6 @@ public class AvroToArrow {
       throws IOException {
     Preconditions.checkNotNull(schema, "Avro schema object can not be null");
 
-    VectorSchemaRoot root = VectorSchemaRoot.create(
-        AvroToArrowUtils.avroToArrowSchema(schema), allocator);
-    AvroToArrowUtils.avroToArrowVectors(decoder, root);
-    return root;
+    return AvroToArrowUtils.avroToArrowVectors(schema, decoder, allocator);
   }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index c5ec765..25611a5 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.arrow.consumers.AvroBooleanConsumer;
 import org.apache.arrow.consumers.AvroBytesConsumer;
@@ -33,21 +34,26 @@ import org.apache.arrow.consumers.AvroDoubleConsumer;
 import org.apache.arrow.consumers.AvroFloatConsumer;
 import org.apache.arrow.consumers.AvroIntConsumer;
 import org.apache.arrow.consumers.AvroLongConsumer;
+import org.apache.arrow.consumers.AvroNullConsumer;
 import org.apache.arrow.consumers.AvroStringConsumer;
+import org.apache.arrow.consumers.AvroUnionsConsumer;
 import org.apache.arrow.consumers.Consumer;
 import org.apache.arrow.consumers.NullableTypeConsumer;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.BaseFixedWidthVector;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.UnionMode;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
@@ -60,11 +66,10 @@ import org.apache.avro.io.Decoder;
  */
 public class AvroToArrowUtils {
 
-  private static final int DEFAULT_BUFFER_SIZE = 256;
-  public static final String NULL_INDEX = "nullIndex";
+  private static final int INVALID_NULL_INDEX = -1;
 
   /**
-   * Creates a {@link Field} from the {@link Schema}
+   * Creates a {@link Consumer} from the {@link Schema}
    *
    <p>This method currently performs following type mapping for Avro data types to corresponding Arrow data types.
    *
@@ -78,69 +83,123 @@ public class AvroToArrowUtils {
    *   <li>BYTES --> ArrowType.Binary</li>
    * </ul>
    */
-  private static Field getArrowField(Schema schema, String name, boolean nullable) {
+  private static Consumer createConsumer(Schema schema, String name, BufferAllocator allocator) {
+    return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator);
+  }
 
+  private static Consumer createConsumer(
+      Schema schema,
+      String name,
+      boolean nullable,
+      int nullIndex,
+      BufferAllocator allocator) {
     Preconditions.checkNotNull(schema, "Avro schema object can't be null");
 
     Type type = schema.getType();
-    ArrowType arrowType;
+
+    final ArrowType arrowType;
+    final FieldType fieldType;
+    final FieldVector vector;
+    final Consumer consumer;
 
     switch (type) {
       case UNION:
-        return getUnionField(schema, name);
+        return createUnionConsumer(schema, name, allocator);
       case STRING:
         arrowType = new ArrowType.Utf8();
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer =  new AvroStringConsumer((VarCharVector) vector);
         break;
       case INT:
         arrowType = new ArrowType.Int(32, /*signed=*/true);
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer = new AvroIntConsumer((IntVector) vector);
         break;
       case BOOLEAN:
         arrowType = new ArrowType.Bool();
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer = new AvroBooleanConsumer((BitVector) vector);
         break;
       case LONG:
         arrowType = new ArrowType.Int(64, /*signed=*/true);
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer =  new AvroLongConsumer((BigIntVector) vector);
         break;
       case FLOAT:
         arrowType =  new ArrowType.FloatingPoint(SINGLE);
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer = new AvroFloatConsumer((Float4Vector) vector);
         break;
       case DOUBLE:
         arrowType = new ArrowType.FloatingPoint(DOUBLE);
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer = new AvroDoubleConsumer((Float8Vector) vector);
         break;
       case BYTES:
         arrowType = new ArrowType.Binary();
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer = new AvroBytesConsumer((VarBinaryVector) vector);
+        break;
+      case NULL:
+        arrowType = new ArrowType.Null();
+        fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+        vector = fieldType.createNewSingleVector(name, allocator, null);
+        consumer = new AvroNullConsumer((ZeroVector) vector);
         break;
       default:
         // no-op, shouldn't get here
         throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName());
     }
 
-    final FieldType fieldType =  new FieldType(nullable, arrowType, /*dictionary=*/null,
-        getMetaData(schema));
-    return new Field(name, fieldType, null);
+    if (nullable) {
+      return new NullableTypeConsumer(consumer, nullIndex);
+    }
+    return consumer;
   }
 
-  private static Field getUnionField(Schema schema, String name) {
+  private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator) {
     int size = schema.getTypes().size();
     long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count();
 
-    // avro schema not allow repeated type, so size == nullCount + 1 indicates nullable type.
-    if (size == nullCount + 1) {
+    // union only has one type, convert to primitive type
+    if (size == 1) {
+      Schema subSchema = schema.getTypes().get(0);
+      return createConsumer(subSchema, name, allocator);
 
+      // size == 2 and has null type, convert to nullable primitive type
+    } else if (size == 2 && nullCount == 1) {
       Schema nullSchema = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).findFirst().get();
-      String nullIndex = String.valueOf(schema.getTypes().indexOf(nullSchema));
+      int nullIndex = schema.getTypes().indexOf(nullSchema);
+      Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
+      Preconditions.checkNotNull(subSchema, "schema should not be null.");
+      return createConsumer(subSchema, name, true, nullIndex, allocator);
 
-      // if has two field and one is null type, convert to nullable primitive type
-      if (size == 2) {
-        Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
-        Preconditions.checkNotNull(subSchema);
-        subSchema.addProp(NULL_INDEX, nullIndex);
-        return getArrowField(subSchema, name,true);
-      } else {
-        //TODO convert avro unions type to arrow UnionVector
-        throw new UnsupportedOperationException();
-      }
+      // real union type
     } else {
-      throw new UnsupportedOperationException();
+
+      final FieldType fieldType =  new FieldType(/*nullable=*/true,
+          new ArrowType.Union(UnionMode.Sparse, null), /*dictionary=*/null, getMetaData(schema));
+      UnionVector unionVector =
+          (UnionVector) fieldType.createNewSingleVector(name, allocator, null);
+
+      Consumer[] delegates = new Consumer[size];
+      Types.MinorType[] types = new Types.MinorType[size];
+
+      for (int i = 0; i < size; i++) {
+        Schema subSchema = schema.getTypes().get(i);
+        Consumer delegate = createConsumer(subSchema, subSchema.getName(), allocator);
+        unionVector.directAddVector(delegate.getVector());
+        delegates[i] = delegate;
+        types[i] = delegate.getVector().getMinorType();
+      }
+      return new AvroUnionsConsumer(unionVector, delegates, types);
     }
   }
 
@@ -151,19 +210,22 @@ public class AvroToArrowUtils {
   }
 
   /**
-   * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for the given Avro {@link Schema}.
+   * Read data from {@link Decoder} and generate a {@link VectorSchemaRoot}.
+   * @param schema avro schema
+   * @param decoder avro decoder to read data from
    */
-  public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema) {
+  public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder, BufferAllocator allocator)
+      throws IOException {
 
-    Preconditions.checkNotNull(schema, "Avro Schema object can't be null");
-    List<Field> arrowFields = new ArrayList<>();
+    List<FieldVector> vectors = new ArrayList<>();
+    List<Consumer> consumers = new ArrayList<>();
 
     Schema.Type type = schema.getType();
-    final Map<String, String> metadata = getMetaData(schema);
-
     if (type == Type.RECORD) {
       for (Schema.Field field : schema.getFields()) {
-        arrowFields.add(getArrowField(field.schema(), field.name(),false));
+        Consumer consumer = createConsumer(field.schema(), field.name(), allocator);
+        consumers.add(consumer);
+        vectors.add(consumer.getVector());
       }
     } else if (type == Type.MAP) {
       throw new UnsupportedOperationException();
@@ -172,82 +234,17 @@ public class AvroToArrowUtils {
     } else if (type == Type.ENUM) {
       throw new UnsupportedOperationException();
     } else {
-      arrowFields.add(getArrowField(schema, "", false));
+      Consumer consumer = createConsumer(schema, "", allocator);
+      consumers.add(consumer);
+      vectors.add(consumer.getVector());
     }
 
-    return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata);
-  }
+    Preconditions.checkArgument(vectors.size() == consumers.size(),
+        "vectors size not equals consumers size");
 
-  /**
-   * Create primitive consumer to read data from decoder, will reduce boxing/unboxing operations.
-   */
-  public static Consumer createPrimitiveConsumer(ValueVector vector) {
+    List<Field> fields = vectors.stream().map(t -> t.getField()).collect(Collectors.toList());
 
-    Consumer consumer;
-    switch (vector.getMinorType()) {
-      case INT:
-        consumer = new AvroIntConsumer((IntVector) vector);
-        break;
-      case VARBINARY:
-        consumer = new AvroBytesConsumer((VarBinaryVector) vector);
-        break;
-      case VARCHAR:
-        consumer = new AvroStringConsumer((VarCharVector) vector);
-        break;
-      case BIGINT:
-        consumer = new AvroLongConsumer((BigIntVector) vector);
-        break;
-      case FLOAT4:
-        consumer = new AvroFloatConsumer((Float4Vector) vector);
-        break;
-      case FLOAT8:
-        consumer = new AvroDoubleConsumer((Float8Vector) vector);
-        break;
-      case BIT:
-        consumer = new AvroBooleanConsumer((BitVector) vector);
-        break;
-      default:
-        throw new RuntimeException("could not get consumer from type:" + vector.getMinorType());
-    }
-
-    if (vector.getField().isNullable()) {
-      int nullIndex = getNullFieldIndex(vector.getField());
-      return new NullableTypeConsumer(consumer, nullIndex);
-    }
-
-    return consumer;
-  }
-
-  /**
-   * Get avro null field index from vector field metadata.
-   */
-  private static int getNullFieldIndex(Field field) {
-    Map<String, String> metadata = field.getMetadata();
-    Preconditions.checkNotNull(metadata, "metadata should not be null when vector is nullable");
-    String index = metadata.get(AvroToArrowUtils.NULL_INDEX);
-    Preconditions.checkNotNull(index, "nullIndex should not be null when vector is nullable");
-    return Integer.parseInt(index);
-  }
-
-  /**
-   * Iterate the given Avro {@link Decoder} object to fetch the data and transpose it to populate
-   * the given Arrow Vector objects.
-   * @param decoder avro decoder to read data.
-   * @param root Arrow {@link VectorSchemaRoot} object to populate
-   */
-  public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) throws IOException {
-
-    Preconditions.checkNotNull(decoder, "Avro decoder object can't be null");
-    Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
-
-    allocateVectors(root, DEFAULT_BUFFER_SIZE);
-
-    // create consumers
-    Consumer[] consumers = new Consumer[root.getFieldVectors().size()];
-    for (int i = 0; i < root.getFieldVectors().size(); i++) {
-      FieldVector vector = root.getFieldVectors().get(i);
-      consumers[i] = createPrimitiveConsumer(vector);
-    }
+    VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
 
     int valueCount = 0;
     while (true) {
@@ -262,17 +259,6 @@ public class AvroToArrowUtils {
         break;
       }
     }
-  }
-
-  private static void allocateVectors(VectorSchemaRoot root, int size) {
-    List<FieldVector> vectors = root.getFieldVectors();
-    for (FieldVector fieldVector : vectors) {
-      if (fieldVector instanceof BaseFixedWidthVector) {
-        ((BaseFixedWidthVector) fieldVector).allocateNew(size);
-      } else {
-        fieldVector.allocateNew();
-      }
-      fieldVector.setInitialCapacity(size);
-    }
+    return root;
   }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index 134cc5c..b2fe704 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
 import java.io.IOException;
 
 import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.complex.impl.BitWriterImpl;
 import org.apache.arrow.vector.complex.writer.BitWriter;
 import org.apache.avro.io.Decoder;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
 public class AvroBooleanConsumer implements Consumer {
 
   private final BitWriter writer;
+  private final BitVector vector;
 
   /**
    * Instantiate a AvroBooleanConsumer.
    */
   public AvroBooleanConsumer(BitVector vector) {
+    this.vector = vector;
     this.writer = new BitWriterImpl(vector);
   }
 
@@ -49,4 +52,15 @@ public class AvroBooleanConsumer implements Consumer {
   public void addNull() {
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return this.vector;
+  }
+
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index 0424641..2c649f9 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
 import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
@@ -68,4 +69,14 @@ public class AvroBytesConsumer implements Consumer {
     holder.buffer.setBytes(0, cacheBuffer, 0,  cacheBuffer.limit());
     writer.write(holder);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 9a60c24..63b2071 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.complex.impl.Float8WriterImpl;
 import org.apache.arrow.vector.complex.writer.Float8Writer;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
 public class AvroDoubleConsumer implements Consumer {
 
   private final Float8Writer writer;
+  private final Float8Vector vector;
 
   /**
    * Instantiate a AvroDoubleConsumer.
    */
   public AvroDoubleConsumer(Float8Vector vector) {
+    this.vector = vector;
     this.writer = new Float8WriterImpl(vector);
   }
 
@@ -49,4 +52,14 @@ public class AvroDoubleConsumer implements Consumer {
   public void addNull() {
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index 8bfe85f..ea752e2 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.complex.impl.Float4WriterImpl;
 import org.apache.arrow.vector.complex.writer.Float4Writer;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
 public class AvroFloatConsumer implements Consumer {
 
   private final Float4Writer writer;
+  private final Float4Vector vector;
 
   /**
    * Instantiate a AvroFloatConsumer.
    */
   public AvroFloatConsumer(Float4Vector vector) {
+    this.vector = vector;
     this.writer = new Float4WriterImpl(vector);
   }
 
@@ -49,4 +52,14 @@ public class AvroFloatConsumer implements Consumer {
   public void addNull() {
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return this.vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index ce117e7..ab830bc 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.complex.impl.IntWriterImpl;
 import org.apache.arrow.vector.complex.writer.IntWriter;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
 public class AvroIntConsumer implements Consumer {
 
   private final IntWriter writer;
+  private final IntVector vector;
 
   /**
    * Instantiate a AvroIntConsumer.
    */
   public AvroIntConsumer(IntVector vector) {
+    this.vector = vector;
     this.writer = new IntWriterImpl(vector);
   }
 
@@ -49,4 +52,14 @@ public class AvroIntConsumer implements Consumer {
   public void addNull() {
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return this.vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index 42e3666..68acb94 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
 import java.io.IOException;
 
 import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.complex.impl.BigIntWriterImpl;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.avro.io.Decoder;
@@ -31,11 +32,13 @@ import org.apache.avro.io.Decoder;
 public class AvroLongConsumer implements Consumer {
 
   private final BigIntWriter writer;
+  private final BigIntVector vector;
 
   /**
    * Instantiate a AvroLongConsumer.
    */
   public AvroLongConsumer(BigIntVector vector) {
+    this.vector = vector;
     this.writer = new BigIntWriterImpl(vector);
   }
 
@@ -49,4 +52,14 @@ public class AvroLongConsumer implements Consumer {
   public void addNull() {
     writer.setPosition(writer.getPosition() + 1);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return this.vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
similarity index 59%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
index 5784242..d06e2f5 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroNullConsumer.java
@@ -19,22 +19,33 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.ZeroVector;
 import org.apache.avro.io.Decoder;
 
 /**
- * Interface that is used to consume values from avro decoder.
+ * Consumer which consume null type values from avro decoder.
+ * Corresponding to {@link org.apache.arrow.vector.ZeroVector}.
  */
-public interface Consumer {
-
-  /**
-   * Consume a specific type value from avro decoder and write it to vector.
-   * @param decoder avro decoder to read data
-   * @throws IOException on error
-   */
-  void consume(Decoder decoder) throws IOException;
-
-  /**
-   * Add null value to vector by making writer position + 1.
-   */
-  void addNull();
+public class AvroNullConsumer implements Consumer {
+
+  private final ZeroVector vector;
+
+  public AvroNullConsumer(ZeroVector vector) {
+    this.vector = vector;
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {}
+
+  @Override
+  public void addNull() {}
+
+  @Override
+  public void setPosition(int index) {}
+
+  @Override
+  public FieldVector getVector() {
+    return this.vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index 7d6d495..1719bf7 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
 import org.apache.arrow.vector.complex.writer.VarCharWriter;
@@ -69,4 +70,14 @@ public class AvroStringConsumer implements Consumer {
 
     writer.write(holder);
   }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return this.vector;
+  }
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
new file mode 100644
index 0000000..5277678
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.UnionVector;
+import org.apache.arrow.vector.complex.impl.UnionWriter;
+import org.apache.arrow.vector.types.Types;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume unions type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.complex.UnionVector}.
+ */
+public class AvroUnionsConsumer implements Consumer {
+
+  private Consumer[] indexDelegates;
+  private Types.MinorType[] types;
+
+  private UnionWriter writer;
+  private UnionVector vector;
+
+  /**
+   * Instantiate a AvroUnionConsumer.
+   */
+  public AvroUnionsConsumer(UnionVector vector, Consumer[] indexDelegates, Types.MinorType[] types) {
+
+    this.writer = new UnionWriter(vector);
+    this.vector = vector;
+    this.indexDelegates = indexDelegates;
+    this.types = types;
+  }
+
+  @Override
+  public void consume(Decoder decoder) throws IOException {
+    int fieldIndex = decoder.readInt();
+    int position = writer.getPosition();
+
+    Consumer delegate = indexDelegates[fieldIndex];
+
+    vector.setType(position, types[fieldIndex]);
+    // In UnionVector we need to set sub vector writer position before consume a value
+    // because in the previous iterations we might not have written to the specific union sub vector.
+    delegate.setPosition(position);
+    delegate.consume(decoder);
+
+    writer.setPosition(position + 1);
+
+  }
+
+  @Override
+  public void addNull() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  @Override
+  public void setPosition(int index) {
+    writer.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    vector.setValueCount(writer.getPosition());
+    return this.vector;
+  }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
index 5784242..c3a543c 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.avro.io.Decoder;
 
 /**
@@ -37,4 +38,14 @@ public interface Consumer {
    * Add null value to vector by making writer position + 1.
    */
   void addNull();
+
+  /**
+   * Set the position to write value into vector.
+   */
+  void setPosition(int index);
+
+  /**
+   * Get the vector within the consumer.
+   */
+  FieldVector getVector();
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
index 31fe3b8..5ac7bd7 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/NullableTypeConsumer.java
@@ -19,6 +19,7 @@ package org.apache.arrow.consumers;
 
 import java.io.IOException;
 
+import org.apache.arrow.vector.FieldVector;
 import org.apache.avro.io.Decoder;
 
 /**
@@ -53,4 +54,15 @@ public class NullableTypeConsumer implements Consumer {
   public void addNull() {
     delegate.addNull();
   }
+
+  @Override
+  public void setPosition(int index) {
+    delegate.setPosition(index);
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return delegate.getVector();
+  }
+
 }
diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
index cce9ea9..47840d6 100644
--- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
+++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -36,7 +36,7 @@ import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.Text;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -286,6 +286,52 @@ public class AvroToArrowTest {
     checkRecordResult(schema, data, root);
   }
 
+  @Test
+  public void testUnionType() throws Exception {
+    Schema schema = getSchema("test_union.avsc");
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    ArrayList<Object> expected = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      record.put(0, i % 2 == 0 ? "test" + i : i);
+      expected.add(i % 2 == 0 ? "test" + i : i);
+      data.add(record);
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(expected, vector);
+  }
+
+  @Test
+  public void testNullableUnionType() throws Exception {
+    Schema schema = getSchema("test_nullable_union.avsc");
+    ArrayList<GenericRecord> data = new ArrayList<>();
+    ArrayList<Object> expected = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      GenericRecord record = new GenericData.Record(schema);
+      if (i % 3 == 0) {
+        record.put(0, "test" + i);
+        expected.add("test" + i);
+        data.add(record);
+      } else if (i % 3 == 1) {
+        record.put(0, i);
+        expected.add(i);
+        data.add(record);
+      } else {
+        record.put(0, null);
+        expected.add(null);
+        data.add(record);
+      }
+    }
+
+    VectorSchemaRoot root = writeAndRead(schema, data);
+    FieldVector vector = root.getFieldVectors().get(0);
+
+    checkPrimitiveResult(expected, vector);
+  }
+
   private void checkPrimitiveResult(ArrayList data, FieldVector vector) {
     assertEquals(data.size(), vector.getValueCount());
     for (int i = 0; i < data.size(); i++) {
@@ -295,9 +341,9 @@ public class AvroToArrowTest {
         assertTrue(value2 == null);
         continue;
       }
-      if (vector.getField().getType().getTypeID() == ArrowType.Binary.TYPE_TYPE) {
+      if (value2 instanceof byte[]) {
         value2 = ByteBuffer.wrap((byte[]) value2);
-      } else if (vector.getField().getType().getTypeID() == ArrowType.Utf8.TYPE_TYPE) {
+      } else if (value2 instanceof Text) {
         value2 = value2.toString();
       }
       assertTrue(Objects.equals(value1, value2));
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
similarity index 60%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
index 5784242..af94812 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_nullable_union.avsc
@@ -15,26 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * Interface that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  /**
-   * Consume a specific type value from avro decoder and write it to vector.
-   * @param decoder avro decoder to read data
-   * @throws IOException on error
-   */
-  void consume(Decoder decoder) throws IOException;
-
-  /**
-   * Add null value to vector by making writer position + 1.
-   */
-  void addNull();
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testNullableUnions",
+ "fields": [
+     {"name": "f0", "type": ["string", "int", "null"]}
+ ]
 }
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/test/resources/schema/test_union.avsc
similarity index 60%
copy from java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
copy to java/adapter/avro/src/test/resources/schema/test_union.avsc
index 5784242..f181e36 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java
+++ b/java/adapter/avro/src/test/resources/schema/test_union.avsc
@@ -15,26 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.consumers;
-
-import java.io.IOException;
-
-import org.apache.avro.io.Decoder;
-
-/**
- * Interface that is used to consume values from avro decoder.
- */
-public interface Consumer {
-
-  /**
-   * Consume a specific type value from avro decoder and write it to vector.
-   * @param decoder avro decoder to read data
-   * @throws IOException on error
-   */
-  void consume(Decoder decoder) throws IOException;
-
-  /**
-   * Add null value to vector by making writer position + 1.
-   */
-  void addNull();
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "record",
+ "name": "testUnions",
+ "fields": [
+     {"name": "f0", "type": ["string", "int"]}
+ ]
 }
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index f288fb6..59cc91f 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -410,6 +410,18 @@ public class UnionVector implements FieldVector {
     return newVector;
   }
 
+  /**
+   * Directly put a vector to internalStruct without creating a new one with same type.
+   */
+  public void directAddVector(FieldVector v) {
+    String name = v.getMinorType().name().toLowerCase();
+    Preconditions.checkState(internalStruct.getChild(name) == null, String.format("%s vector already exists", name));
+    internalStruct.putChild(name, v);
+    if (callBack != null) {
+      callBack.doWork();
+    }
+  }
+
   private class TransferImpl implements TransferPair {
     private final TransferPair internalStructVectorTransferPair;
     private final UnionVector to;