You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/12/24 22:43:27 UTC
svn commit: r1223025 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io: RCFile.java
SchemaAwareCompressionInputStream.java
SchemaAwareCompressionOutputStream.java
Author: heyongqiang
Date: Sat Dec 24 21:43:27 2011
New Revision: 1223025
URL: http://svn.apache.org/viewvc?rev=1223025&view=rev
Log:
HIVE-2600: Enable/Add type-specific compression for rcfile (Krishna Kumar via He Yongqiang)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1223025&r1=1223024&r2=1223025&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Sat Dec 24 21:43:27 2011
@@ -340,6 +340,9 @@ public class RCFile {
decompressBuffer.reset();
DataInputStream valueIn = new DataInputStream(deflatFilter);
deflatFilter.resetState();
+ if (deflatFilter instanceof SchemaAwareCompressionInputStream) {
+ ((SchemaAwareCompressionInputStream)deflatFilter).setColumnIndex(colIndex);
+ }
decompressBuffer.reset(compressedData.getData(),
keyBuffer.eachColumnValueLen[colIndex]);
@@ -591,6 +594,8 @@ public class RCFile {
KeyBuffer key = null;
ValueBuffer value = null;
+ private final int[] plainTotalColumnLength;
+ private final int[] comprTotalColumnLength;
/*
* used for buffering appends before flush them out
@@ -749,6 +754,9 @@ public class RCFile {
finalizeFileHeader();
key = new KeyBuffer(columnNumber);
value = new ValueBuffer(key);
+
+ plainTotalColumnLength = new int[columnNumber];
+ comprTotalColumnLength = new int[columnNumber];
}
/** Write the initial part of file header. */
@@ -797,6 +805,9 @@ public class RCFile {
compressionBuffer[i] = new NonSyncDataOutputBuffer();
deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
compressor);
+ if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) {
+ ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i);
+ }
deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
deflateFilter[i]));
}
@@ -901,12 +912,16 @@ public class RCFile {
value.setColumnValueBuffer(compressionBuffer[columnIndex],
columnIndex);
valueLength += colLen;
+ plainTotalColumnLength[columnIndex] += columnValuePlainLength[columnIndex];
+ comprTotalColumnLength[columnIndex] += colLen;
} else {
int colLen = columnValuePlainLength[columnIndex];
key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen,
columnIndex);
value.setColumnValueBuffer(columnValue, columnIndex);
valueLength += colLen;
+ plainTotalColumnLength[columnIndex] += colLen;
+ comprTotalColumnLength[columnIndex] += colLen;
}
columnValuePlainLength[columnIndex] = 0;
}
@@ -1001,6 +1016,10 @@ public class RCFile {
out.close();
out = null;
}
+ for (int i = 0; i < columnNumber; i++) {
+ LOG.info("Column#" + i + " : Plain Total Column Value Length: " + plainTotalColumnLength[i]
+ + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]);
+ }
}
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java?rev=1223025&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionInputStream.java Sat Dec 24 21:43:27 2011
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.io.compress.*;
+/**
+ *
+ * SchemaAwareCompressionInputStream adds the ability to inform the compression
+ * stream what column is being read.
+ *
+ */
+public abstract class SchemaAwareCompressionInputStream extends CompressionInputStream {
+
+ protected SchemaAwareCompressionInputStream(InputStream in) {
+ super(in);
+ }
+
+ /**
+ * The column being read
+ *
+ * @param columnIndex the index of the column. Use -1 for non-column data
+ */
+ public abstract void setColumnIndex(int columnIndex);
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java?rev=1223025&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SchemaAwareCompressionOutputStream.java Sat Dec 24 21:43:27 2011
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.compress.*;
+
+/**
+ *
+ * SchemaAwareCompressionOutputStream adds the ability to inform the comression stream
+ * the current column being compressed.
+ *
+ */
+public abstract class SchemaAwareCompressionOutputStream extends CompressionOutputStream {
+
+ protected SchemaAwareCompressionOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ /**
+ *
+ * The column being output
+ *
+ * @param columnIndex the index of the column. Use -1 for non-column data
+ */
+ public abstract void setColumnIndex(int columnIndex);
+}