You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2021/06/11 23:26:45 UTC

[gobblin] branch master updated: [GOBBLIN-1465] Refactor the GobblinOrcWriter to support using a different OrcValueWriter

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 096562c  [GOBBLIN-1465] Refactor the GobblinOrcWriter to support using a different OrcValueWriter
096562c is described below

commit 096562c634c3c8dac63fddd607b5132d9363522f
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Fri Jun 11 16:26:35 2021 -0700

    [GOBBLIN-1465] Refactor the GobblinOrcWriter to support using a different OrcValueWriter
    
    Closes #3305 from htran1/orc_struct_writer
---
 ...linOrcWriter.java => GobblinBaseOrcWriter.java} |  77 +++---
 .../apache/gobblin/writer/GobblinOrcWriter.java    | 297 +--------------------
 2 files changed, 50 insertions(+), 324 deletions(-)

diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
similarity index 88%
copy from gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
copy to gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
index 57c19b6..5599975 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java
@@ -17,12 +17,14 @@
 
 package org.apache.gobblin.writer;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Properties;
-
+import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.state.ConstructState;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
@@ -42,22 +44,13 @@ import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
 import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
-import com.google.common.annotations.VisibleForTesting;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.state.ConstructState;
-
-import static org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
-import static org.apache.gobblin.writer.AvroOrcSchemaConverter.getOrcSchema;
-
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
 
 /**
  * A wrapper for ORC-core writer without dependency on Hive SerDe library.
  */
 @Slf4j
-public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
+public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
   static final String ORC_WRITER_PREFIX = "orcWriter.";
   public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize";
   public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
@@ -95,7 +88,7 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
    */
   private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = ORC_WRITER_PREFIX + "deepCleanBatch";
 
-  private final GenericRecordToOrcValueWriter valueWriter;
+  private final OrcValueWriter<D> valueWriter;
   @VisibleForTesting
   final VectorizedRowBatch rowBatch;
   private final Writer orcFileWriter;
@@ -105,7 +98,7 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
   private final boolean deepCleanBatch;
 
   private final int batchSize;
-  private final Schema avroSchema;
+  protected final S inputSchema;
 
   /**
    * There are couple of parameters in ORC writer that requires manual tuning based on record size given that executor
@@ -134,9 +127,9 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
   /**
    * Calculate the heap size in MB available for ORC writers.
    */
-  protected long availableHeapSize(State Properties) {
+  protected long availableHeapSize(State properties) {
     // Calculate the recommended size as the threshold for memory check
-    long physicalMem = Math.round(Properties.getPropAsLong(CONTAINER_MEMORY_MBS, DEFAULT_CONTAINER_MEMORY_MBS)
+    long physicalMem = Math.round(properties.getPropAsLong(CONTAINER_MEMORY_MBS, DEFAULT_CONTAINER_MEMORY_MBS)
         * properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY));
     long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
     return physicalMem - nonHeap;
@@ -151,7 +144,7 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
         : EXEMPLIFIED_RECORD_SIZE_IN_BYTES) * estimatedRecordSizeScale;
   }
 
-  public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties)
+  public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
       throws IOException {
     super(builder, properties);
     if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED, ORC_WRITER_AUTO_TUNE_DEFAULT)) {
@@ -159,15 +152,15 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
     }
 
     // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
-    this.avroSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema(this.avroSchema);
-    this.valueWriter = new GenericRecordToOrcValueWriter(typeDescription, this.avroSchema, properties);
+    this.inputSchema = builder.getSchema();
+    TypeDescription typeDescription = getOrcSchema();
+    this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
     this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
     this.rowBatch = typeDescription.createRowBatch(this.batchSize);
     this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
 
     log.info("Start to construct a ORC-Native Writer, with batchSize:" + batchSize + ", enable batchDeepClean:"
-        + deepCleanBatch + "\n, schema in avro format:" + this.avroSchema);
+        + deepCleanBatch + "\n, schema in input format:" + this.inputSchema);
 
     // Create file-writer
     Configuration conf = new Configuration();
@@ -184,6 +177,24 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
     this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
   }
 
+  /**
+   * Get the ORC schema as a {@link TypeDescription}
+   */
+  protected abstract TypeDescription getOrcSchema();
+
+  /**
+   * Get an {@OrcValueWriter} for the specified schema and configuration.
+   */
+  protected abstract OrcValueWriter<D> getOrcValueWriter(TypeDescription typeDescription, S inputSchema, State state);
+
+  /**
+   * Get the schema properties, including the following:
+   * avro.schema.literal
+   * columns
+   * column_types
+   */
+  protected abstract Properties getPropsWithOrcSchema() throws SerDeException;
+
   @Override
   public long recordsWritten() {
     return this.orcFileWriter.getNumberOfRows();
@@ -196,7 +207,7 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
      */
     ConstructState state = new ConstructState(super.getFinalState());
     try {
-      state.addOverwriteProperties(new State(getPropsWithOrcSchema(this.avroSchema)));
+      state.addOverwriteProperties(new State(getPropsWithOrcSchema()));
     } catch (SerDeException e) {
       throw new RuntimeException("Failure to set schema metadata in finalState properly which "
           + "could possible lead to incorrect data registration", e);
@@ -226,7 +237,7 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
     } else {
       // Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
       if (rowBatch.size > 0) {
-        throw new CloseBeforeFlushException(this.avroSchema.getName());
+        throw new CloseBeforeFlushException(this.inputSchema.toString());
       }
     }
   }
@@ -255,7 +266,7 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
    * one should consider lower this value to make memory-check more active.
    */
   @Override
-  public void write(GenericRecord record)
+  public void write(D record)
       throws IOException {
     valueWriter.write(record, rowBatch);
     if (rowBatch.size == this.batchSize) {
@@ -322,20 +333,4 @@ public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
       ((DecimalColumnVector) cv).vector = null;
     }
   }
-
-  @Override
-  public boolean isSpeculativeAttemptSafe() {
-    return this.writerAttemptIdOptional.isPresent() && this.getClass() == GobblinOrcWriter.class;
-  }
-
-  public static Properties getPropsWithOrcSchema(Schema avroSchema) throws SerDeException {
-    Properties properties = new Properties();
-    properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), avroSchema.toString());
-    AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(avroSchema);
-
-    properties.setProperty("columns", StringUtils.join(aoig.getColumnNames(), ","));
-    properties.setProperty("columns.types", StringUtils.join(aoig.getColumnTypes(), ","));
-
-    return properties;
-  }
 }
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index 57c19b6..6c9a190 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -23,319 +23,50 @@ import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.UnionColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-
-import com.google.common.annotations.VisibleForTesting;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.state.ConstructState;
-
-import static org.apache.gobblin.configuration.ConfigurationKeys.AVG_RECORD_SIZE;
-import static org.apache.gobblin.writer.AvroOrcSchemaConverter.getOrcSchema;
-
 
 /**
  * A wrapper for ORC-core writer without dependency on Hive SerDe library.
  */
 @Slf4j
-public class GobblinOrcWriter extends FsDataWriter<GenericRecord> {
-  static final String ORC_WRITER_PREFIX = "orcWriter.";
-  public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize";
-  public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;
-
-  private static final String CONTAINER_MEMORY_MBS = ORC_WRITER_PREFIX + "jvm.memory.mbs";
-  private static final int DEFAULT_CONTAINER_MEMORY_MBS = 4096;
-
-  private static final String CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = ORC_WRITER_PREFIX + "container.jvmMemoryXmxRatio";
-  private static final double DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY = 1.0;
-
-  static final String CONTAINER_JVM_MEMORY_OVERHEAD_MBS = ORC_WRITER_PREFIX + "container.jvmMemoryOverheadMbs";
-  private static final int DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS = 0;
-
-  @VisibleForTesting
-  static final String ORC_WRITER_AUTO_TUNE_ENABLED = ORC_WRITER_PREFIX + "autoTuneEnabled";
-  private static final boolean ORC_WRITER_AUTO_TUNE_DEFAULT = false;
-  private static final long EXEMPLIFIED_RECORD_SIZE_IN_BYTES = 1024;
-
-  /**
-   * This value gives an estimation on how many writers are buffering records at the same time in a container.
-   * Since time-based partition scheme is a commonly used practice, plus the chances for late-arrival data,
-   * usually there would be 2-3 writers running during the hourly boundary. 3 is chosen here for being conservative.
-   */
-  private static final int ESTIMATED_PARALLELISM_WRITERS = 3;
-
-  // The serialized record size passed from AVG_RECORD_SIZE is smaller than the actual in-memory representation
-  // of a record. This is just the number represents how many times that the actual buffer storing record is larger
-  // than the serialized size passed down from upstream constructs.
-  @VisibleForTesting
-  static final String RECORD_SIZE_SCALE_FACTOR = "recordSize.scaleFactor";
-  static final int DEFAULT_RECORD_SIZE_SCALE_FACTOR = 6;
-
-  /**
-   * Check comment of {@link #deepCleanRowBatch} for the usage of this configuration.
-   */
-  private static final String ORC_WRITER_DEEP_CLEAN_EVERY_BATCH = ORC_WRITER_PREFIX + "deepCleanBatch";
-
-  private final GenericRecordToOrcValueWriter valueWriter;
-  @VisibleForTesting
-  final VectorizedRowBatch rowBatch;
-  private final Writer orcFileWriter;
-
-  // the close method may be invoked multiple times, but the underlying writer only supports close being called once
-  private volatile boolean closed = false;
-  private final boolean deepCleanBatch;
-
-  private final int batchSize;
-  private final Schema avroSchema;
-
-  /**
-   * There are couple of parameters in ORC writer that requires manual tuning based on record size given that executor
-   * for running these ORC writers has limited heap space. This helper function wrap them and has side effect for the
-   * argument {@param properties}.
-   *
-   * Assumption for current implementation:
-   * The extractor or source class should set {@link org.apache.gobblin.configuration.ConfigurationKeys#AVG_RECORD_SIZE}
-   */
-  protected void autoTunedOrcWriterParams(State properties) {
-    double writerRatio = properties.getPropAsDouble(OrcConf.MEMORY_POOL.name(), (double) OrcConf.MEMORY_POOL.getDefaultValue());
-    long availableHeapPerWriter = Math.round(availableHeapSize(properties) * writerRatio / ESTIMATED_PARALLELISM_WRITERS);
-
-    // Upstream constructs will need to set this value properly
-    long estimatedRecordSize = getEstimatedRecordSize(properties);
-    long rowsBetweenCheck = availableHeapPerWriter * 1024 / estimatedRecordSize;
-    properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.name(),
-        Math.min(rowsBetweenCheck, (int) OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()));
-    // Row batch size should be smaller than row_between_check, 4 is just a magic number picked here.
-    long batchSize = Math.min(rowsBetweenCheck / 4, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    properties.setProp(ORC_WRITER_BATCH_SIZE, batchSize);
-    log.info("Tuned the parameter " + OrcConf.ROWS_BETWEEN_CHECKS.name() + " to be:" + rowsBetweenCheck + ","
-        + ORC_WRITER_BATCH_SIZE + " to be:" + batchSize);
-  }
-
-  /**
-   * Calculate the heap size in MB available for ORC writers.
-   */
-  protected long availableHeapSize(State Properties) {
-    // Calculate the recommended size as the threshold for memory check
-    long physicalMem = Math.round(Properties.getPropAsLong(CONTAINER_MEMORY_MBS, DEFAULT_CONTAINER_MEMORY_MBS)
-        * properties.getPropAsDouble(CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO_KEY));
-    long nonHeap = properties.getPropAsLong(CONTAINER_JVM_MEMORY_OVERHEAD_MBS, DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-    return physicalMem - nonHeap;
-  }
-
-  /**
-   * Calculate the estimated record size in bytes.
-   */
-  protected long getEstimatedRecordSize(State properties) {
-    long estimatedRecordSizeScale = properties.getPropAsInt(RECORD_SIZE_SCALE_FACTOR, DEFAULT_RECORD_SIZE_SCALE_FACTOR);
-    return (properties.contains(AVG_RECORD_SIZE) ? properties.getPropAsLong(AVG_RECORD_SIZE)
-        : EXEMPLIFIED_RECORD_SIZE_IN_BYTES) * estimatedRecordSizeScale;
-  }
-
-  public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties)
-      throws IOException {
+public class GobblinOrcWriter extends GobblinBaseOrcWriter<Schema, GenericRecord> {
+  public GobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
     super(builder, properties);
-    if (properties.getPropAsBoolean(ORC_WRITER_AUTO_TUNE_ENABLED, ORC_WRITER_AUTO_TUNE_DEFAULT)) {
-      autoTunedOrcWriterParams(properties);
-    }
-
-    // Create value-writer which is essentially a record-by-record-converter with buffering in batch.
-    this.avroSchema = builder.getSchema();
-    TypeDescription typeDescription = getOrcSchema(this.avroSchema);
-    this.valueWriter = new GenericRecordToOrcValueWriter(typeDescription, this.avroSchema, properties);
-    this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE);
-    this.rowBatch = typeDescription.createRowBatch(this.batchSize);
-    this.deepCleanBatch = properties.getPropAsBoolean(ORC_WRITER_DEEP_CLEAN_EVERY_BATCH, false);
-
-    log.info("Start to construct a ORC-Native Writer, with batchSize:" + batchSize + ", enable batchDeepClean:"
-        + deepCleanBatch + "\n, schema in avro format:" + this.avroSchema);
-
-    // Create file-writer
-    Configuration conf = new Configuration();
-    // Populate job Configurations into Conf as well so that configurations related to ORC writer can be tuned easily.
-    for (Object key : properties.getProperties().keySet()) {
-      conf.set((String) key, properties.getProp((String) key));
-    }
-
-    OrcFile.WriterOptions options = OrcFile.writerOptions(properties.getProperties(), conf);
-    options.setSchema(typeDescription);
-
-    // For buffer-writer, flush has to be executed before close so it is better we maintain the life-cycle of fileWriter
-    // instead of delegating it to closer object in FsDataWriter.
-    this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options);
   }
 
   @Override
-  public long recordsWritten() {
-    return this.orcFileWriter.getNumberOfRows();
+  protected TypeDescription getOrcSchema() {
+    return AvroOrcSchemaConverter.getOrcSchema(this.inputSchema);
   }
 
   @Override
-  public State getFinalState() {
-    /**
-     * Creating {@link ConstructState} to provide overwrite of {@link WorkUnitState} from constructs.
-     */
-    ConstructState state = new ConstructState(super.getFinalState());
-    try {
-      state.addOverwriteProperties(new State(getPropsWithOrcSchema(this.avroSchema)));
-    } catch (SerDeException e) {
-      throw new RuntimeException("Failure to set schema metadata in finalState properly which "
-          + "could possible lead to incorrect data registration", e);
-    }
-
-    return state;
+  protected OrcValueWriter<GenericRecord> getOrcValueWriter(TypeDescription typeDescription, Schema inputSchema,
+      State state) {
+    return new GenericRecordToOrcValueWriter(typeDescription, this.inputSchema, this.properties);
   }
 
   @Override
-  public void flush()
-      throws IOException {
-    if (rowBatch.size > 0) {
-      orcFileWriter.addRowBatch(rowBatch);
-      rowBatch.reset();
-      if (deepCleanBatch) {
-        deepCleanRowBatch(rowBatch);
-      }
-    }
-  }
-
-  private synchronized void closeInternal()
-      throws IOException {
-    if (!closed) {
-      this.flush();
-      this.orcFileWriter.close();
-      this.closed = true;
-    } else {
-      // Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
-      if (rowBatch.size > 0) {
-        throw new CloseBeforeFlushException(this.avroSchema.getName());
-      }
-    }
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    closeInternal();
-    super.close();
-  }
-
-  /**
-   * Extra careful about the fact: super.commit() invoke closer.close based on its semantics of "commit".
-   * That means close can happen in both commit() and close()
-   */
-  @Override
-  public void commit()
-      throws IOException {
-    closeInternal();
-    super.commit();
-  }
+  protected Properties getPropsWithOrcSchema() throws SerDeException {
+    Properties properties = new Properties();
 
-  /**
-   * Note: orc.rows.between.memory.checks is the configuration available to tune memory-check sensitivity in ORC-Core
-   * library. By default it is set to 5000. If the user-application is dealing with large-row Kafka topics for example,
-   * one should consider lower this value to make memory-check more active.
-   */
-  @Override
-  public void write(GenericRecord record)
-      throws IOException {
-    valueWriter.write(record, rowBatch);
-    if (rowBatch.size == this.batchSize) {
-      orcFileWriter.addRowBatch(rowBatch);
-      rowBatch.reset();
-      if (deepCleanBatch) {
-        log.info("A reset of rowBatch is triggered - releasing holding memory for large object");
-        deepCleanRowBatch(rowBatch);
-      }
-    }
-  }
+    properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), this.inputSchema.toString());
+    AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(this.inputSchema);
 
-  /**
-   * The reset call of {@link VectorizedRowBatch} doesn't release the memory occupied by each {@link ColumnVector}'s child,
-   * which is usually an array of objects, while it only set those value to null.
-   * This method ensure the reference to the child array for {@link ColumnVector} are released and gives a hint of GC,
-   * so that each reset could release the memory pre-allocated by {@link ColumnVector#ensureSize(int, boolean)} method.
-   *
-   * This feature is configurable and should only be turned on if a dataset is:
-   * 1. Has large per-record size.
-   * 2. Has {@link org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector} as part of schema,
-   * like array, map and all nested structures containing these.
-   */
-  @VisibleForTesting
-  void deepCleanRowBatch(VectorizedRowBatch rowBatch) {
-    for(int i = 0; i < rowBatch.cols.length; ++i) {
-      ColumnVector cv = rowBatch.cols[i];
-      if (cv != null) {
-        removeRefOfColumnVectorChild(cv);
-      }
-    }
-  }
+    properties.setProperty("columns", StringUtils.join(aoig.getColumnNames(), ","));
+    properties.setProperty("columns.types", StringUtils.join(aoig.getColumnTypes(), ","));
 
-  /**
-   * Set the child field of {@link ColumnVector} to null, assuming input {@link ColumnVector} is nonNull.
-   */
-  private void removeRefOfColumnVectorChild(ColumnVector cv) {
-    if (cv instanceof StructColumnVector) {
-      StructColumnVector structCv = (StructColumnVector) cv;
-      for (ColumnVector childCv: structCv.fields) {
-        removeRefOfColumnVectorChild(childCv);
-      }
-    } else if (cv instanceof ListColumnVector) {
-      ListColumnVector listCv = (ListColumnVector) cv;
-      removeRefOfColumnVectorChild(listCv.child);
-    } else if (cv instanceof MapColumnVector) {
-      MapColumnVector mapCv = (MapColumnVector) cv;
-      removeRefOfColumnVectorChild(mapCv.keys);
-      removeRefOfColumnVectorChild(mapCv.values);
-    } else if (cv instanceof UnionColumnVector) {
-      UnionColumnVector unionCv = (UnionColumnVector) cv;
-      for (ColumnVector unionChildCv : unionCv.fields) {
-        removeRefOfColumnVectorChild(unionChildCv);
-      }
-    } else if (cv instanceof LongColumnVector) {
-      ((LongColumnVector) cv).vector = null;
-    } else if (cv instanceof DoubleColumnVector) {
-      ((DoubleColumnVector) cv).vector = null;
-    } else if (cv instanceof BytesColumnVector) {
-      ((BytesColumnVector) cv).vector = null;
-      ((BytesColumnVector) cv).start = null;
-      ((BytesColumnVector) cv).length = null;
-    } else if (cv instanceof DecimalColumnVector) {
-      ((DecimalColumnVector) cv).vector = null;
-    }
+    return properties;
   }
 
   @Override
   public boolean isSpeculativeAttemptSafe() {
     return this.writerAttemptIdOptional.isPresent() && this.getClass() == GobblinOrcWriter.class;
   }
-
-  public static Properties getPropsWithOrcSchema(Schema avroSchema) throws SerDeException {
-    Properties properties = new Properties();
-    properties.setProperty(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), avroSchema.toString());
-    AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(avroSchema);
-
-    properties.setProperty("columns", StringUtils.join(aoig.getColumnNames(), ","));
-    properties.setProperty("columns.types", StringUtils.join(aoig.getColumnTypes(), ","));
-
-    return properties;
-  }
 }