You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/12/24 22:43:27 UTC

svn commit: r1223025 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io: RCFile.java SchemaAwareCompressionInputStream.java SchemaAwareCompressionOutputStream.java

Author: heyongqiang
Date: Sat Dec 24 21:43:27 2011
New Revision: 1223025

URL: http://svn.apache.org/viewvc?rev=1223025&view=rev
Log:
HIVE-2600: Enable/Add type-specific compression for rcfile (Krishna Kumar via He Yongqiang)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1223025&r1=1223024&r2=1223025&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Sat Dec 24 21:43:27 2011
@@ -340,6 +340,9 @@ public class RCFile {
         decompressBuffer.reset();
         DataInputStream valueIn = new DataInputStream(deflatFilter);
         deflatFilter.resetState();
+        if (deflatFilter instanceof SchemaAwareCompressionInputStream) {
+          ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex);
+        }
         decompressBuffer.reset(compressedData.getData(),
             keyBuffer.eachColumnValueLen[colIndex]);
 
@@ -591,6 +594,8 @@ public class RCFile {
 
     KeyBuffer key = null;
     ValueBuffer value = null;
+    private final int[] plainTotalColumnLength;
+    private final int[] comprTotalColumnLength;
 
     /*
      * used for buffering appends before flush them out
@@ -749,6 +754,9 @@ public class RCFile {
       finalizeFileHeader();
       key = new KeyBuffer(columnNumber);
       value = new ValueBuffer(key);
+
+      plainTotalColumnLength = new int[columnNumber];
+      comprTotalColumnLength = new int[columnNumber];
     }
 
     /** Write the initial part of file header. */
@@ -797,6 +805,9 @@ public class RCFile {
           compressionBuffer[i] = new NonSyncDataOutputBuffer();
           deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
               compressor);
+          if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) {
+            ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i);
+          }
           deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
               deflateFilter[i]));
         }
@@ -901,12 +912,16 @@ public class RCFile {
           value.setColumnValueBuffer(compressionBuffer[columnIndex],
               columnIndex);
           valueLength += colLen;
+          plainTotalColumnLength[columnIndex] += columnValuePlainLength[columnIndex];
+          comprTotalColumnLength[columnIndex] += colLen;
         } else {
           int colLen = columnValuePlainLength[columnIndex];
           key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen,
               columnIndex);
           value.setColumnValueBuffer(columnValue, columnIndex);
           valueLength += colLen;
+          plainTotalColumnLength[columnIndex] += colLen;
+          comprTotalColumnLength[columnIndex] += colLen;
         }
         columnValuePlainLength[columnIndex] = 0;
       }
@@ -1001,6 +1016,10 @@ public class RCFile {
         out.close();
         out = null;
       }
+      for (int i = 0; i < columnNumber; i++) {
+        LOG.info("Column#" + i + " : Plain Total Column Value Length: " + plainTotalColumnLength[i]
+              + ",  Compr Total Column Value Length: " + comprTotalColumnLength[i]);
+      }
     }
   }
 

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java?rev=1223025&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java Sat Dec 24 21:43:27 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.io.compress.*;
+/**
+ *
+ * SchemaAwareCompressionInputStream adds the ability to inform the compression
+ * stream what column is being read.
+ *
+ */
+public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream {
+
+  protected SchemaAwareCompressionInputStream(InputStream in) {
+    super(in);
+  }
+
+  /**
+   * The column being read
+   *
+   * @param columnIndex the index of the column. Use -1 for non-column data
+   */
+  public abstract void setColumnIndex(int columnIndex);
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java?rev=1223025&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java Sat Dec 24 21:43:27 2011
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.compress.*;
+
+/**
+ *
+ * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream
+ * the current column being compressed.
+ *
+ */
+public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream {
+
+  protected SchemaAwareCompressionOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  /**
+   *
+   * The column being output
+   *
+   * @param columnIndex the index of the column. Use -1 for non-column data
+   */
+  public abstract void setColumnIndex(int columnIndex);
+}