You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/07/31 19:29:16 UTC
svn commit: r1367692 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Author: hashutosh
Date: Tue Jul 31 17:29:16 2012
New Revision: 1367692
URL: http://svn.apache.org/viewvc?rev=1367692&view=rev
Log:
HIVE-3153 : Release codecs and output streams between flushes of RCFile (Owen Omalley via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1367692&r1=1367691&r2=1367692&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Jul 31 17:29:16 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.io;
-import java.io.BufferedOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
@@ -209,7 +208,7 @@ public class RCFile {
* </ul>
*/
public static class KeyBuffer implements WritableComparable {
- // each column's value length in a split
+ // each column's length in the value
private int[] eachColumnValueLen = null;
private int[] eachColumnUncompressedValueLen = null;
// stores each cell's length of a column in one DataOutputBuffer element
@@ -224,18 +223,22 @@ public class RCFile {
return columnNumber;
}
+ @SuppressWarnings("unused")
+ @Deprecated
public KeyBuffer(){
}
- KeyBuffer(int columnNumber) {
- this(0, columnNumber);
- }
-
- KeyBuffer(int numberRows, int columnNum) {
+ KeyBuffer(int columnNum) {
columnNumber = columnNum;
eachColumnValueLen = new int[columnNumber];
eachColumnUncompressedValueLen = new int[columnNumber];
allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+ }
+
+ @SuppressWarnings("unused")
+ @Deprecated
+ KeyBuffer(int numberRows, int columnNum) {
+ this(columnNum);
this.numberRows = numberRows;
}
@@ -388,18 +391,26 @@ public class RCFile {
NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
CompressionInputStream deflatFilter = null;
+ @SuppressWarnings("unused")
+ @Deprecated
public ValueBuffer() throws IOException {
}
+ @SuppressWarnings("unused")
+ @Deprecated
public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
- this(keyBuffer, null);
+ this(keyBuffer, keyBuffer.columnNumber, null, null, true);
}
+ @SuppressWarnings("unused")
+ @Deprecated
public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
throws IOException {
- this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null);
+ this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true);
}
+ @SuppressWarnings("unused")
+ @Deprecated
public ValueBuffer(KeyBuffer currentKey, int columnNumber,
boolean[] skippedCols, CompressionCodec codec) throws IOException {
this(currentKey, columnNumber, skippedCols, codec, true);
@@ -422,11 +433,9 @@ public class RCFile {
}
int skipped = 0;
- if (skippedColIDs != null) {
- for (boolean currentSkip : skippedColIDs) {
- if (currentSkip) {
- skipped++;
- }
+ for (boolean currentSkip : skippedColIDs) {
+ if (currentSkip) {
+ skipped++;
}
}
loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
@@ -464,6 +473,8 @@ public class RCFile {
}
}
+ @SuppressWarnings("unused")
+ @Deprecated
public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
int addIndex) {
loadedColumnsValueBuffer[addIndex] = valBuffer;
@@ -576,7 +587,6 @@ public class RCFile {
CompressionCodec codec = null;
Metadata metadata = null;
- Compressor compressor = null;
// Insert a globally unique 16-byte value every few entries, so that one
// can seek into the middle of a file and then synchronize with record
@@ -604,22 +614,13 @@ public class RCFile {
// how many records already buffered
private int bufferedRecords = 0;
- NonSyncDataOutputBuffer[] compressionBuffer;
- CompressionOutputStream[] deflateFilter = null;
- DataOutputStream[] deflateOut = null;
private final ColumnBuffer[] columnBuffers;
- NonSyncDataOutputBuffer keyCompressionBuffer;
- CompressionOutputStream keyDeflateFilter;
- DataOutputStream keyDeflateOut;
- Compressor keyCompressor;
-
private int columnNumber = 0;
private final int[] columnValuePlainLength;
KeyBuffer key = null;
- ValueBuffer value = null;
private final int[] plainTotalColumnLength;
private final int[] comprTotalColumnLength;
@@ -669,7 +670,6 @@ public class RCFile {
private void startNewGroup(int currentLen) {
prevValueLength = currentLen;
runLength = 0;
- return;
}
public void clear() throws IOException {
@@ -713,7 +713,7 @@ public class RCFile {
*/
public Writer(FileSystem fs, Configuration conf, Path name,
Progressable progress, CompressionCodec codec) throws IOException {
- this(fs, conf, name, null, new Metadata(), codec);
+ this(fs, conf, name, progress, new Metadata(), codec);
}
/**
@@ -725,8 +725,8 @@ public class RCFile {
* the configuration file
* @param name
* the file name
- * @param progress
- * @param metadata
+ * @param progress a progress meter to update as the file is written
+ * @param metadata a string to string map in the file header
* @throws IOException
*/
public Writer(FileSystem fs, Configuration conf, Path name,
@@ -746,11 +746,11 @@ public class RCFile {
* the configuration file
* @param name
* the file name
- * @param bufferSize
- * @param replication
- * @param blockSize
- * @param progress
- * @param metadata
+ * @param bufferSize the size of the file buffer
+ * @param replication the number of replicas for the file
+ * @param blockSize the block size of the file
+ * @param progress the progress meter for writing the file
+ * @param metadata a string to string map in the file header
* @throws IOException
*/
public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
@@ -775,13 +775,12 @@ public class RCFile {
columnBuffers[i] = new ColumnBuffer();
}
- init(name, conf, fs.create(name, true, bufferSize, replication,
- blockSize, progress), codec, metadata);
+ init(conf, fs.create(name, true, bufferSize, replication,
+ blockSize, progress), codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
key = new KeyBuffer(columnNumber);
- value = new ValueBuffer(key);
plainTotalColumnLength = new int[columnNumber];
comprTotalColumnLength = new int[columnNumber];
@@ -824,41 +823,19 @@ public class RCFile {
metadata.write(out);
}
- void init(Path name, Configuration conf, FSDataOutputStream out,
+ void init(Configuration conf, FSDataOutputStream out,
CompressionCodec codec, Metadata metadata) throws IOException {
this.conf = conf;
this.out = out;
this.codec = codec;
this.metadata = metadata;
- if (this.codec != null) {
- ReflectionUtils.setConf(codec, this.conf);
- compressor = CodecPool.getCompressor(codec);
-
- compressionBuffer = new NonSyncDataOutputBuffer[columnNumber];
- deflateFilter = new CompressionOutputStream[columnNumber];
- deflateOut = new DataOutputStream[columnNumber];
- for (int i = 0; i < columnNumber; i++) {
- compressionBuffer[i] = new NonSyncDataOutputBuffer();
- deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
- compressor);
- if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) {
- ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i);
- }
- deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
- deflateFilter[i]));
- }
- keyCompressor = CodecPool.getCompressor(codec);
- keyCompressionBuffer = new NonSyncDataOutputBuffer();
- keyDeflateFilter = codec.createOutputStream(keyCompressionBuffer,
- keyCompressor);
- keyDeflateOut = new DataOutputStream(new BufferedOutputStream(
- keyDeflateFilter));
- }
this.useNewMagic =
conf.getBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
}
/** Returns the compression codec of data in this file. */
+ @SuppressWarnings("unused")
+ @Deprecated
public CompressionCodec getCompressionCodec() {
return codec;
}
@@ -873,6 +850,8 @@ public class RCFile {
}
/** Returns the configuration of this file. */
+ @SuppressWarnings("unused")
+ @Deprecated
Configuration getConf() {
return conf;
}
@@ -892,7 +871,7 @@ public class RCFile {
* If its size() is greater then the column number in the file, the exceeded
* columns' bytes are ignored.
*
- * @param val
+ * @param val a BytesRefArrayWritable with the list of serialized columns
* @throws IOException
*/
public void append(Writable val) throws IOException {
@@ -928,39 +907,48 @@ public class RCFile {
private void flushRecords() throws IOException {
key.numberRows = bufferedRecords;
- value.keyBuffer = key;
+ Compressor compressor = null;
+ NonSyncDataOutputBuffer valueBuffer = null;
+ CompressionOutputStream deflateFilter = null;
+ DataOutputStream deflateOut = null;
+ boolean isCompressed = isCompressed();
int valueLength = 0;
+ if (isCompressed) {
+ ReflectionUtils.setConf(codec, this.conf);
+ compressor = CodecPool.getCompressor(codec);
+ valueBuffer = new NonSyncDataOutputBuffer();
+ deflateFilter = codec.createOutputStream(valueBuffer, compressor);
+ deflateOut = new DataOutputStream(deflateFilter);
+ }
+
for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
ColumnBuffer currentBuf = columnBuffers[columnIndex];
currentBuf.flushGroup();
NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+ int colLen;
+ int plainLen = columnValuePlainLength[columnIndex];
- if (isCompressed()) {
- compressionBuffer[columnIndex].reset();
- deflateFilter[columnIndex].resetState();
- deflateOut[columnIndex].write(columnValue.getData(), 0, columnValue
- .getLength());
- deflateOut[columnIndex].flush();
- deflateFilter[columnIndex].finish();
- int colLen = compressionBuffer[columnIndex].getLength();
- key.setColumnLenInfo(colLen, currentBuf.valLenBuffer,
- columnValuePlainLength[columnIndex], columnIndex);
- value.setColumnValueBuffer(compressionBuffer[columnIndex],
- columnIndex);
- valueLength += colLen;
- plainTotalColumnLength[columnIndex] += columnValuePlainLength[columnIndex];
- comprTotalColumnLength[columnIndex] += colLen;
+ if (isCompressed) {
+ if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
+ ((SchemaAwareCompressionOutputStream)deflateFilter).
+ setColumnIndex(columnIndex);
+ }
+ deflateFilter.resetState();
+ deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+ deflateOut.flush();
+ deflateFilter.finish();
+ // find how much compressed data was added for this column
+ colLen = valueBuffer.getLength() - valueLength;
} else {
- int colLen = columnValuePlainLength[columnIndex];
- key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen,
- columnIndex);
- value.setColumnValueBuffer(columnValue, columnIndex);
- valueLength += colLen;
- plainTotalColumnLength[columnIndex] += colLen;
- comprTotalColumnLength[columnIndex] += colLen;
+ colLen = columnValuePlainLength[columnIndex];
}
+ valueLength += colLen;
+ key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
+ columnIndex);
+ plainTotalColumnLength[columnIndex] += plainLen;
+ comprTotalColumnLength[columnIndex] += colLen;
columnValuePlainLength[columnIndex] = 0;
}
@@ -968,25 +956,22 @@ public class RCFile {
if (keyLength < 0) {
throw new IOException("negative length keys not allowed: " + key);
}
+ if (compressor != null) {
+ CodecPool.returnCompressor(compressor);
+ }
- // Write the record out
- checkAndWriteSync(); // sync
- out.writeInt(keyLength + valueLength); // total record length
- out.writeInt(keyLength); // key portion length
- if (!isCompressed()) {
- out.writeInt(keyLength);
- key.write(out); // key
+ // Write the key out
+ writeKey(key, keyLength + valueLength, keyLength);
+ // write the value out
+ if (isCompressed) {
+ out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
} else {
- keyCompressionBuffer.reset();
- keyDeflateFilter.resetState();
- key.write(keyDeflateOut);
- keyDeflateOut.flush();
- keyDeflateFilter.finish();
- int compressedKeyLen = keyCompressionBuffer.getLength();
- out.writeInt(compressedKeyLen);
- out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+ for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
+ NonSyncDataOutputBuffer buf =
+ columnBuffers[columnIndex].columnValBuffer;
+ out.write(buf.getData(), 0, buf.getLength());
+ }
}
- value.write(out); // value
// clear the columnBuffers
clearColumnBuffers();
@@ -999,27 +984,38 @@ public class RCFile {
* flush a block out without doing anything except compressing the key part.
*/
public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
- int recordLen, int keyLength, int compressedKeyLen) throws IOException {
+ int recordLen, int keyLength,
+ @SuppressWarnings("unused") int compressedKeyLen) throws IOException {
+ writeKey(keyBuffer, recordLen, keyLength);
+ valueBuffer.write(out);
+ }
+
+ private void writeKey(KeyBuffer keyBuffer, int recordLen,
+ int keyLength) throws IOException {
checkAndWriteSync(); // sync
out.writeInt(recordLen); // total record length
out.writeInt(keyLength); // key portion length
if(this.isCompressed()) {
+ Compressor compressor = CodecPool.getCompressor(codec);
+ NonSyncDataOutputBuffer compressionBuffer =
+ new NonSyncDataOutputBuffer();
+ CompressionOutputStream deflateFilter =
+ codec.createOutputStream(compressionBuffer, compressor);
+ DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
//compress key and write key out
- keyCompressionBuffer.reset();
- keyDeflateFilter.resetState();
- keyBuffer.write(keyDeflateOut);
- keyDeflateOut.flush();
- keyDeflateFilter.finish();
- compressedKeyLen = keyCompressionBuffer.getLength();
+ compressionBuffer.reset();
+ deflateFilter.resetState();
+ keyBuffer.write(deflateOut);
+ deflateOut.flush();
+ deflateFilter.finish();
+ int compressedKeyLen = compressionBuffer.getLength();
out.writeInt(compressedKeyLen);
- out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+ out.write(compressionBuffer.getData(), 0, compressedKeyLen);
} else {
- out.writeInt(compressedKeyLen);
+ out.writeInt(keyLength);
keyBuffer.write(out);
}
-
- valueBuffer.write(out); // value
}
private void clearColumnBuffers() throws IOException {
@@ -1034,19 +1030,6 @@ public class RCFile {
}
clearColumnBuffers();
- if (isCompressed()) {
- for (int i = 0; i < columnNumber; i++) {
- deflateFilter[i].close();
- IOUtils.closeStream(deflateOut[i]);
- }
- keyDeflateFilter.close();
- IOUtils.closeStream(keyDeflateOut);
- CodecPool.returnCompressor(keyCompressor);
- keyCompressor = null;
- CodecPool.returnCompressor(compressor);
- compressor = null;
- }
-
if (out != null) {
// Close the underlying stream if we own it...
@@ -1055,8 +1038,9 @@ public class RCFile {
out = null;
}
for (int i = 0; i < columnNumber; i++) {
- LOG.info("Column#" + i + " : Plain Total Column Value Length: " + plainTotalColumnLength[i]
- + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]);
+ LOG.info("Column#" + i + " : Plain Total Column Value Length: "
+ + plainTotalColumnLength[i]
+ + ", Compr Total Column Value Length: " + comprTotalColumnLength[i]);
}
}
}
@@ -1094,8 +1078,6 @@ public class RCFile {
private final ValueBuffer currentValue;
- private final boolean[] skippedColIDs = null;
-
private int readRowsIndexInBuffer = 0;
private int recordsNumInValBuffer = 0;
@@ -1189,7 +1171,7 @@ public class RCFile {
}
loadColumnNum = columnNumber;
- if (skippedColIDs != null && skippedColIDs.length > 0) {
+ if (skippedColIDs.length > 0) {
for (boolean skippedColID : skippedColIDs) {
if (skippedColID) {
loadColumnNum -= 1;
@@ -1302,8 +1284,7 @@ public class RCFile {
try {
Class<? extends CompressionCodec> codecClass = conf.getClassByName(
codecClassname).asSubclass(CompressionCodec.class);
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
- conf);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException(
"Unknown codec: " + codecClassname, cnfe);
@@ -1526,7 +1507,7 @@ public class RCFile {
* {@link #next(LongWritable)} and
* {@link #getCurrentRow(BytesRefArrayWritable)}.
*
- * @param columnID
+ * @param columnID the number of the column to get 0 to N-1
* @throws IOException
*/
public BytesRefArrayWritable getColumn(int columnID,
@@ -1584,6 +1565,8 @@ public class RCFile {
* @return whether there still has records or not
* @throws IOException
*/
+ @SuppressWarnings("unused")
+ @Deprecated
public synchronized boolean nextColumnsBatch() throws IOException {
passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
return nextKeyBuffer() > 0;
@@ -1620,15 +1603,12 @@ public class RCFile {
eof.printStackTrace();
}
}
- if (ret > 0) {
- return next(readRows);
- }
- return false;
+ return (ret > 0) && next(readRows);
}
private int nextKeyValueTolerateCorruptions() throws IOException {
long currentOffset = in.getPos();
- int ret = -1;
+ int ret;
try {
ret = nextKeyBuffer();
this.currentValueBuffer();
@@ -1747,6 +1727,7 @@ public class RCFile {
}
/** Returns true iff the previous call to next passed a sync mark. */
+ @SuppressWarnings("unused")
public boolean syncSeen() {
return syncSeen;
}
@@ -1762,6 +1743,7 @@ public class RCFile {
return file.toString();
}
+ @SuppressWarnings("unused")
public boolean isCompressedRCFile() {
return this.decompress;
}