You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/09/09 09:08:32 UTC

[02/50] [abbrv] hive git commit: HIVE-11472: ORC StringDirectTreeReader is thrashing the GC due to byte[] allocation per row (Gopal V, reviewed by Ashutosh Chauhan)

HIVE-11472: ORC StringDirectTreeReader is thrashing the GC due to byte[] allocation per row (Gopal V, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb7153f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb7153f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb7153f9

Branch: refs/heads/beeline-cli
Commit: bb7153f9b1ee2d7e067341d252667edac593e15e
Parents: 3e63fc4
Author: Gopal V <go...@apache.org>
Authored: Tue Aug 25 14:19:36 2015 -0700
Committer: Gopal V <go...@apache.org>
Committed: Tue Aug 25 14:23:02 2015 -0700

----------------------------------------------------------------------
 .../hive/ql/io/orc/TreeReaderFactory.java       | 18 ++++------
 .../apache/hadoop/hive/shims/Hadoop23Shims.java | 38 ++++++++++++++++++++
 .../apache/hadoop/hive/shims/HadoopShims.java   | 22 ++++++++++++
 .../hadoop/hive/shims/HadoopShimsSecure.java    | 32 +++++++++++++++++
 4 files changed, 99 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bb7153f9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 9bfe268..6d47532 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -1486,6 +1488,7 @@ public class TreeReaderFactory {
    */
   protected static class StringDirectTreeReader extends TreeReader {
     protected InStream stream;
+    protected TextReaderShim data;
     protected IntegerReader lengths;
     private final LongColumnVector scratchlcv;
 
@@ -1500,6 +1503,7 @@ public class TreeReaderFactory {
       this.stream = data;
       if (length != null && encoding != null) {
         this.lengths = createIntegerReader(encoding, length, false, false);
+        this.data = ShimLoader.getHadoopShims().getTextReaderShim(this.stream);
       }
     }
 
@@ -1520,6 +1524,7 @@ public class TreeReaderFactory {
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
       stream = streams.get(name);
+      data = ShimLoader.getHadoopShims().getTextReaderShim(this.stream);
       lengths = createIntegerReader(stripeFooter.getColumnsList().get(columnId).getKind(),
           streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
           false, false);
@@ -1534,6 +1539,7 @@ public class TreeReaderFactory {
     public void seek(PositionProvider index) throws IOException {
       super.seek(index);
       stream.seek(index);
+      // don't seek data stream
       lengths.seek(index);
     }
 
@@ -1548,17 +1554,7 @@ public class TreeReaderFactory {
           result = (Text) previous;
         }
         int len = (int) lengths.next();
-        int offset = 0;
-        byte[] bytes = new byte[len];
-        while (len > 0) {
-          int written = stream.read(bytes, offset, len);
-          if (written < 0) {
-            throw new EOFException("Can't finish byte read from " + stream);
-          }
-          len -= written;
-          offset += written;
-        }
-        result.set(bytes);
+        data.read(result, len);
       }
       return result;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/bb7153f9/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 29d0f13..3292cb3 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hive.shims;
 
+import java.io.DataInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
@@ -68,7 +70,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -110,10 +114,12 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   HadoopShims.MiniDFSShim cluster = null;
   final boolean zeroCopy;
   final boolean storagePolicy;
+  final boolean fastread;
 
   public Hadoop23Shims() {
     boolean zcr = false;
     boolean storage = false;
+    boolean fastread = false;
     try {
       Class.forName("org.apache.hadoop.fs.CacheFlag", false,
           ShimLoader.class.getClassLoader());
@@ -130,8 +136,18 @@ public class Hadoop23Shims extends HadoopShimsSecure {
       } catch (ClassNotFoundException ce) {
       }
     }
+
+    if (storage) {
+      for (Method m : Text.class.getMethods()) {
+        if ("readWithKnownLength".equals(m.getName())) {
+          fastread = true;
+        }
+      }
+    }
+
     this.storagePolicy = storage;
     this.zeroCopy = zcr;
+    this.fastread = fastread;
   }
 
   @Override
@@ -1409,4 +1425,26 @@ public class Hadoop23Shims extends HadoopShimsSecure {
   public long getFileId(FileSystem fs, String path) throws IOException {
     return ensureDfs(fs).getClient().getFileInfo(path).getFileId();
   }
+
+  private final class FastTextReaderShim implements TextReaderShim {
+    private final DataInputStream din;
+
+    public FastTextReaderShim(InputStream in) {
+      this.din = new DataInputStream(in);
+    }
+
+    @Override
+    public void read(Text txt, int len) throws IOException {
+      txt.readWithKnownLength(din, len);
+    }
+  }
+
+  @Override
+  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+    if (!fastread) {
+      return super.getTextReaderShim(in);
+    }
+    return new FastTextReaderShim(in);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bb7153f9/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 2b6f322..6e2dedb 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.shims;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -49,6 +50,7 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobProfile;
@@ -746,4 +748,24 @@ public interface HadoopShims {
    * @return inode ID of the file.
    */
   long getFileId(FileSystem fs, String path) throws IOException;
+
+  /**
+   * Read data into a Text object in the fastest way possible
+   */
+  public interface TextReaderShim {
+    /**
+     * @param txt
+     * @param len
+     * @return bytes read
+     * @throws IOException
+     */
+    void read(Text txt, int size) throws IOException;
+  }
+
+  /**
+   * Wrap a TextReaderShim around an input stream. The reader shim will not
+   * buffer any reads from the underlying stream and will only consume bytes
+   * which are required for TextReaderShim.read() input.
+   */
+  public TextReaderShim getTextReaderShim(InputStream input) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bb7153f9/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
index 89d7798..c6b7c9d 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java
@@ -19,7 +19,9 @@ package org.apache.hadoop.hive.shims;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.net.URI;
 import java.security.AccessControlException;
@@ -40,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -392,4 +395,33 @@ public abstract class HadoopShimsSecure implements HadoopShims {
 
   @Override
   abstract public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException;
+
+  private final class BasicTextReaderShim implements TextReaderShim {
+    private final InputStream in;
+
+    public BasicTextReaderShim(InputStream in) {
+      this.in = in;
+    }
+
+    @Override
+    public void read(Text txt, int len) throws IOException {
+      int offset = 0;
+      byte[] bytes = new byte[len];
+      while (len > 0) {
+        int written = in.read(bytes, offset, len);
+        if (written < 0) {
+          throw new EOFException("Can't finish read from " + in + " read "
+              + (offset) + " bytes out of " + bytes.length);
+        }
+        len -= written;
+        offset += written;
+      }
+      txt.set(bytes);
+    }
+  }
+
+  @Override
+  public TextReaderShim getTextReaderShim(InputStream in) throws IOException {
+    return new BasicTextReaderShim(in);
+  }
 }