You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/29 21:06:08 UTC

[iceberg] 06/09: Hive: Fix NoSuchMethodError of OrcTail with Hive3.x and Vectorized ORC (#3155)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.12.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 64083f81e4017d3262634aeb4db23cda1e4a4ca1
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Tue Oct 12 11:10:41 2021 +0200

    Hive: Fix NoSuchMethodError of OrcTail with Hive3.x and Vectorized ORC (#3155)
---
 build.gradle                                       |   2 +
 .../org/apache/hadoop/hive/ql/io/orc/OrcSplit.java | 284 +++++++++++++++++++++
 2 files changed, 286 insertions(+)

diff --git a/build.gradle b/build.gradle
index dd99804..8a8812a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -736,6 +736,8 @@ project(':iceberg-hive-runtime') {
     relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
     relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
     relocate 'org.threeten.extra', 'org.apache.iceberg.shaded.org.threeten.extra'
+    // relocate OrcSplit in order to avoid the conflict from Hive's OrcSplit
+    relocate 'org.apache.hadoop.hive.ql.io.orc.OrcSplit', 'org.apache.iceberg.shaded.org.apache.hadoop.hive.ql.io.orc.OrcSplit'
 
     classifier null
   }
diff --git a/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
new file mode 100644
index 0000000..00f389e
--- /dev/null
+++ b/hive3/src/main/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.ColumnarSplit;
+import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.OrcTail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * In order to fix some compatibility issues with ORC support with Hive 3.x and the shaded ORC libraries,
+ * this class has been copied from Hive 3.x source code. However, this class should be removed once
+ * Hive 4 is out.
+ */
+public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit {
+  private static final Logger LOG = LoggerFactory.getLogger(OrcSplit.class);
+  private OrcTail orcTail;
+  private boolean hasFooter;
+  /**
+   * This means {@link AcidUtils.AcidBaseFileType#ORIGINAL_BASE}
+   */
+  private boolean isOriginal;
+  private boolean hasBase;
+  // partition root
+  private Path rootDir;
+  private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
+  private long projColsUncompressedSize;
+  private transient Object fileKey;
+  private long fileLen;
+
+  static final int HAS_SYNTHETIC_FILEID_FLAG = 16;
+  static final int HAS_LONG_FILEID_FLAG = 8;
+  static final int BASE_FLAG = 4;
+  static final int ORIGINAL_FLAG = 2;
+  static final int FOOTER_FLAG = 1;
+
+  protected OrcSplit() {
+    // The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
+    // This constructor is used to create the object and then call readFields()
+    // so just pass nulls to this super constructor.
+    super(null, 0, 0, (String[]) null);
+  }
+
+  public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts,
+                  OrcTail orcTail, boolean isOriginal, boolean hasBase,
+                  List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize, long fileLen, Path rootDir) {
+    super(path, offset, length, hosts);
+    // For HDFS, we could avoid serializing file ID and just replace the path with inode-based
+    // path. However, that breaks bunch of stuff because Hive later looks up things by split path.
+    this.fileKey = fileId;
+    this.orcTail = orcTail;
+    hasFooter = this.orcTail != null;
+    this.isOriginal = isOriginal;
+    this.hasBase = hasBase;
+    this.rootDir = rootDir;
+    this.deltas.addAll(deltas);
+    this.projColsUncompressedSize = projectedDataSize <= 0 ? length : projectedDataSize;
+    // setting file length to Long.MAX_VALUE will let orc reader read file length from file system
+    this.fileLen = fileLen <= 0 ? Long.MAX_VALUE : fileLen;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    // serialize path, offset, length using FileSplit
+    super.write(dos);
+    int required = bos.size();
+
+    // write addition payload required for orc
+    writeAdditionalPayload(dos);
+    int additional = bos.size() - required;
+
+    out.write(bos.toByteArray());
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing additional {} bytes to OrcSplit as payload. Required {} bytes.",
+              additional, required);
+    }
+  }
+
+  private void writeAdditionalPayload(final DataOutputStream out) throws IOException {
+    boolean isFileIdLong = fileKey instanceof Long;
+    boolean isFileIdWritable = fileKey instanceof Writable;
+    int flags = (hasBase ? BASE_FLAG : 0) |
+            (isOriginal ? ORIGINAL_FLAG : 0) |
+            (hasFooter ? FOOTER_FLAG : 0) |
+            (isFileIdLong ? HAS_LONG_FILEID_FLAG : 0) |
+            (isFileIdWritable ? HAS_SYNTHETIC_FILEID_FLAG : 0);
+    out.writeByte(flags);
+    out.writeInt(deltas.size());
+    for (AcidInputFormat.DeltaMetaData delta : deltas) {
+      delta.write(out);
+    }
+    if (hasFooter) {
+      OrcProto.FileTail fileTail = orcTail.getMinimalFileTail();
+      byte[] tailBuffer = fileTail.toByteArray();
+      int tailLen = tailBuffer.length;
+      WritableUtils.writeVInt(out, tailLen);
+      out.write(tailBuffer);
+    }
+    if (isFileIdLong) {
+      out.writeLong(((Long) fileKey).longValue());
+    } else if (isFileIdWritable) {
+      ((Writable) fileKey).write(out);
+    }
+    out.writeLong(fileLen);
+    out.writeUTF(rootDir.toString());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // deserialize path, offset, length using FileSplit
+    super.readFields(in);
+
+    byte flags = in.readByte();
+    hasFooter = (FOOTER_FLAG & flags) != 0;
+    isOriginal = (ORIGINAL_FLAG & flags) != 0;
+    hasBase = (BASE_FLAG & flags) != 0;
+    boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0;
+    boolean hasWritableFileId = (HAS_SYNTHETIC_FILEID_FLAG & flags) != 0;
+    if (hasLongFileId && hasWritableFileId) {
+      throw new IOException("Invalid split - both file ID types present");
+    }
+
+    deltas.clear();
+    int numDeltas = in.readInt();
+    for (int i = 0; i < numDeltas; i++) {
+      AcidInputFormat.DeltaMetaData dmd = new AcidInputFormat.DeltaMetaData();
+      dmd.readFields(in);
+      deltas.add(dmd);
+    }
+    if (hasFooter) {
+      int tailLen = WritableUtils.readVInt(in);
+      byte[] tailBuffer = new byte[tailLen];
+      in.readFully(tailBuffer);
+      OrcProto.FileTail fileTail = OrcProto.FileTail.parseFrom(tailBuffer);
+      orcTail = new OrcTail(fileTail, null);
+    }
+    if (hasLongFileId) {
+      fileKey = in.readLong();
+    } else if (hasWritableFileId) {
+      SyntheticFileId fileId = new SyntheticFileId();
+      fileId.readFields(in);
+      this.fileKey = fileId;
+    }
+    fileLen = in.readLong();
+    rootDir = new Path(in.readUTF());
+  }
+
+  public OrcTail getOrcTail() {
+    return orcTail;
+  }
+
+  public boolean hasFooter() {
+    return hasFooter;
+  }
+
+  /**
+   * @return {@code true} if file schema doesn't have Acid metadata columns
+   * Such file may be in a delta_x_y/ or base_x due to being added via
+   * "load data" command.  It could be at partition|table root due to table having
+   * been converted from non-acid to acid table.  It could even be something like
+   * "warehouse/t/HIVE_UNION_SUBDIR_15/000000_0" if it was written by an
+   * "insert into t select ... from A union all select ... from B"
+   */
+  public boolean isOriginal() {
+    return isOriginal;
+  }
+
+  public boolean hasBase() {
+    return hasBase;
+  }
+
+  public Path getRootDir() {
+    return rootDir;
+  }
+
+  public List<AcidInputFormat.DeltaMetaData> getDeltas() {
+    return deltas;
+  }
+
+  public long getFileLength() {
+    return fileLen;
+  }
+
+  /**
+   * If this method returns true, then for sure it is ACID.
+   * However, if it returns false.. it could be ACID or non-ACID.
+   *
+   * @return true if is ACID
+   */
+  public boolean isAcid() {
+    return hasBase || deltas.size() > 0;
+  }
+
+  public long getProjectedColumnsUncompressedSize() {
+    return projColsUncompressedSize;
+  }
+
+  public Object getFileKey() {
+    return fileKey;
+  }
+
+  @Override
+  public long getColumnarProjectionSize() {
+    return projColsUncompressedSize;
+  }
+
+  @Override
+  public boolean canUseLlapIo(Configuration conf) {
+    final boolean hasDelta = deltas != null && !deltas.isEmpty();
+    final boolean isAcidRead = AcidUtils.isFullAcidScan(conf);
+    final boolean isVectorized = HiveConf.getBoolVar(conf, ConfVars.HIVE_VECTORIZATION_ENABLED);
+    Boolean isSplitUpdate = null;
+    if (isAcidRead) {
+      final AcidUtils.AcidOperationalProperties acidOperationalProperties
+              = AcidUtils.getAcidOperationalProperties(conf);
+      isSplitUpdate = acidOperationalProperties.isSplitUpdate();
+    }
+
+    if (isOriginal) {
+      if (!isAcidRead && !hasDelta) {
+        // Original scan only
+        return true;
+      }
+    } else {
+      boolean isAcidEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ACID_ENABLED);
+      if (isAcidEnabled && isAcidRead && hasBase && isVectorized) {
+        if (hasDelta) {
+          if (isSplitUpdate) { // Base with delete deltas
+            return true;
+          }
+        } else {
+          // Base scan only
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "OrcSplit [" + getPath() + ", start=" + getStart() + ", length=" + getLength() +
+            ", isOriginal=" + isOriginal + ", fileLength=" + fileLen + ", hasFooter=" + hasFooter +
+            ", hasBase=" + hasBase + ", deltas=" + (deltas == null ? 0 : deltas.size()) + "]";
+  }
+}