You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2012/07/31 19:29:16 UTC

svn commit: r1367692 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Author: hashutosh
Date: Tue Jul 31 17:29:16 2012
New Revision: 1367692

URL: http://svn.apache.org/viewvc?rev=1367692&view=rev
Log:
HIVE-3153 : Release codecs and output streams between flushes of RCFile (Owen Omalley via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1367692&r1=1367691&r2=1367692&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Jul 31 17:29:16 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.io;
 
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -209,7 +208,7 @@ public class RCFile {
    * </ul>
    */
   public static class KeyBuffer implements WritableComparable {
-    // each column's value length in a split
+    // each column's length in the value
     private int[] eachColumnValueLen = null;
     private int[] eachColumnUncompressedValueLen = null;
     // stores each cell's length of a column in one DataOutputBuffer element
@@ -224,18 +223,22 @@ public class RCFile {
       return columnNumber;
     }
 
+    @SuppressWarnings("unused")
+    @Deprecated
     public KeyBuffer(){
     }
 
-    KeyBuffer(int columnNumber) {
-      this(0, columnNumber);
-    }
-
-    KeyBuffer(int numberRows, int columnNum) {
+    KeyBuffer(int columnNum) {
       columnNumber = columnNum;
       eachColumnValueLen = new int[columnNumber];
       eachColumnUncompressedValueLen = new int[columnNumber];
       allCellValLenBuffer = new NonSyncDataOutputBuffer[columnNumber];
+    }
+
+    @SuppressWarnings("unused")
+    @Deprecated
+    KeyBuffer(int numberRows, int columnNum) {
+      this(columnNum);
       this.numberRows = numberRows;
     }
 
@@ -388,18 +391,26 @@ public class RCFile {
     NonSyncDataInputBuffer decompressBuffer = new NonSyncDataInputBuffer();
     CompressionInputStream deflatFilter = null;
 
+    @SuppressWarnings("unused")
+    @Deprecated
     public ValueBuffer() throws IOException {
     }
 
+    @SuppressWarnings("unused")
+    @Deprecated
     public ValueBuffer(KeyBuffer keyBuffer) throws IOException {
-      this(keyBuffer, null);
+      this(keyBuffer, keyBuffer.columnNumber, null, null, true);
     }
 
+    @SuppressWarnings("unused")
+    @Deprecated
     public ValueBuffer(KeyBuffer keyBuffer, boolean[] skippedColIDs)
         throws IOException {
-      this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null);
+      this(keyBuffer, keyBuffer.columnNumber, skippedColIDs, null, true);
     }
 
+    @SuppressWarnings("unused")
+    @Deprecated
     public ValueBuffer(KeyBuffer currentKey, int columnNumber,
         boolean[] skippedCols, CompressionCodec codec) throws IOException {
       this(currentKey, columnNumber, skippedCols, codec, true);
@@ -422,11 +433,9 @@ public class RCFile {
       }
 
       int skipped = 0;
-      if (skippedColIDs != null) {
-        for (boolean currentSkip : skippedColIDs) {
-          if (currentSkip) {
-            skipped++;
-          }
+      for (boolean currentSkip : skippedColIDs) {
+        if (currentSkip) {
+          skipped++;
         }
       }
       loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
@@ -464,6 +473,8 @@ public class RCFile {
       }
     }
 
+    @SuppressWarnings("unused")
+    @Deprecated
     public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
         int addIndex) {
       loadedColumnsValueBuffer[addIndex] = valBuffer;
@@ -576,7 +587,6 @@ public class RCFile {
 
     CompressionCodec codec = null;
     Metadata metadata = null;
-    Compressor compressor = null;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -604,22 +614,13 @@ public class RCFile {
     // how many records already buffered
     private int bufferedRecords = 0;
 
-    NonSyncDataOutputBuffer[] compressionBuffer;
-    CompressionOutputStream[] deflateFilter = null;
-    DataOutputStream[] deflateOut = null;
     private final ColumnBuffer[] columnBuffers;
 
-    NonSyncDataOutputBuffer keyCompressionBuffer;
-    CompressionOutputStream keyDeflateFilter;
-    DataOutputStream keyDeflateOut;
-    Compressor keyCompressor;
-
     private int columnNumber = 0;
 
     private final int[] columnValuePlainLength;
 
     KeyBuffer key = null;
-    ValueBuffer value = null;
     private final int[] plainTotalColumnLength;
     private final int[] comprTotalColumnLength;
 
@@ -669,7 +670,6 @@ public class RCFile {
       private void startNewGroup(int currentLen) {
         prevValueLength = currentLen;
         runLength = 0;
-        return;
       }
 
       public void clear() throws IOException {
@@ -713,7 +713,7 @@ public class RCFile {
      */
     public Writer(FileSystem fs, Configuration conf, Path name,
         Progressable progress, CompressionCodec codec) throws IOException {
-      this(fs, conf, name, null, new Metadata(), codec);
+      this(fs, conf, name, progress, new Metadata(), codec);
     }
 
     /**
@@ -725,8 +725,8 @@ public class RCFile {
      *          the configuration file
      * @param name
      *          the file name
-     * @param progress
-     * @param metadata
+     * @param progress a progress meter to update as the file is written
+     * @param metadata a string to string map in the file header
      * @throws IOException
      */
     public Writer(FileSystem fs, Configuration conf, Path name,
@@ -746,11 +746,11 @@ public class RCFile {
      *          the configuration file
      * @param name
      *          the file name
-     * @param bufferSize
-     * @param replication
-     * @param blockSize
-     * @param progress
-     * @param metadata
+     * @param bufferSize the size of the file buffer
+     * @param replication the number of replicas for the file
+     * @param blockSize the block size of the file
+     * @param progress the progress meter for writing the file
+     * @param metadata a string to string map in the file header
      * @throws IOException
      */
     public Writer(FileSystem fs, Configuration conf, Path name, int bufferSize,
@@ -775,13 +775,12 @@ public class RCFile {
         columnBuffers[i] = new ColumnBuffer();
       }
 
-      init(name, conf, fs.create(name, true, bufferSize, replication,
-          blockSize, progress), codec, metadata);
+      init(conf, fs.create(name, true, bufferSize, replication,
+        blockSize, progress), codec, metadata);
       initializeFileHeader();
       writeFileHeader();
       finalizeFileHeader();
       key = new KeyBuffer(columnNumber);
-      value = new ValueBuffer(key);
 
       plainTotalColumnLength = new int[columnNumber];
       comprTotalColumnLength = new int[columnNumber];
@@ -824,41 +823,19 @@ public class RCFile {
       metadata.write(out);
     }
 
-    void init(Path name, Configuration conf, FSDataOutputStream out,
+    void init(Configuration conf, FSDataOutputStream out,
         CompressionCodec codec, Metadata metadata) throws IOException {
       this.conf = conf;
       this.out = out;
       this.codec = codec;
       this.metadata = metadata;
-      if (this.codec != null) {
-        ReflectionUtils.setConf(codec, this.conf);
-        compressor = CodecPool.getCompressor(codec);
-
-        compressionBuffer = new NonSyncDataOutputBuffer[columnNumber];
-        deflateFilter = new CompressionOutputStream[columnNumber];
-        deflateOut = new DataOutputStream[columnNumber];
-        for (int i = 0; i < columnNumber; i++) {
-          compressionBuffer[i] = new NonSyncDataOutputBuffer();
-          deflateFilter[i] = codec.createOutputStream(compressionBuffer[i],
-              compressor);
-          if (deflateFilter[i] instanceof SchemaAwareCompressionOutputStream) {
-            ((SchemaAwareCompressionOutputStream)deflateFilter[i]).setColumnIndex(i);
-          }
-          deflateOut[i] = new DataOutputStream(new BufferedOutputStream(
-              deflateFilter[i]));
-        }
-        keyCompressor = CodecPool.getCompressor(codec);
-        keyCompressionBuffer = new NonSyncDataOutputBuffer();
-        keyDeflateFilter = codec.createOutputStream(keyCompressionBuffer,
-            keyCompressor);
-        keyDeflateOut = new DataOutputStream(new BufferedOutputStream(
-            keyDeflateFilter));
-      }
       this.useNewMagic =
           conf.getBoolean(HiveConf.ConfVars.HIVEUSEEXPLICITRCFILEHEADER.varname, true);
     }
 
     /** Returns the compression codec of data in this file. */
+    @SuppressWarnings("unused")
+    @Deprecated
     public CompressionCodec getCompressionCodec() {
       return codec;
     }
@@ -873,6 +850,8 @@ public class RCFile {
     }
 
     /** Returns the configuration of this file. */
+    @SuppressWarnings("unused")
+    @Deprecated
     Configuration getConf() {
       return conf;
     }
@@ -892,7 +871,7 @@ public class RCFile {
      * If its size() is greater then the column number in the file, the exceeded
      * columns' bytes are ignored.
      *
-     * @param val
+     * @param val a BytesRefArrayWritable with the list of serialized columns
      * @throws IOException
      */
     public void append(Writable val) throws IOException {
@@ -928,39 +907,48 @@ public class RCFile {
     private void flushRecords() throws IOException {
 
       key.numberRows = bufferedRecords;
-      value.keyBuffer = key;
 
+      Compressor compressor = null;
+      NonSyncDataOutputBuffer valueBuffer = null;
+      CompressionOutputStream deflateFilter = null;
+      DataOutputStream deflateOut = null;
+      boolean isCompressed = isCompressed();
       int valueLength = 0;
+      if (isCompressed) {
+        ReflectionUtils.setConf(codec, this.conf);
+        compressor = CodecPool.getCompressor(codec);
+        valueBuffer = new NonSyncDataOutputBuffer();
+        deflateFilter = codec.createOutputStream(valueBuffer, compressor);
+        deflateOut = new DataOutputStream(deflateFilter);
+      }
+
       for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
         ColumnBuffer currentBuf = columnBuffers[columnIndex];
         currentBuf.flushGroup();
 
         NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
+        int colLen;
+        int plainLen = columnValuePlainLength[columnIndex];
 
-        if (isCompressed()) {
-          compressionBuffer[columnIndex].reset();
-          deflateFilter[columnIndex].resetState();
-          deflateOut[columnIndex].write(columnValue.getData(), 0, columnValue
-              .getLength());
-          deflateOut[columnIndex].flush();
-          deflateFilter[columnIndex].finish();
-          int colLen = compressionBuffer[columnIndex].getLength();
-          key.setColumnLenInfo(colLen, currentBuf.valLenBuffer,
-              columnValuePlainLength[columnIndex], columnIndex);
-          value.setColumnValueBuffer(compressionBuffer[columnIndex],
-              columnIndex);
-          valueLength += colLen;
-          plainTotalColumnLength[columnIndex] += columnValuePlainLength[columnIndex];
-          comprTotalColumnLength[columnIndex] += colLen;
+        if (isCompressed) {
+          if (deflateFilter instanceof SchemaAwareCompressionOutputStream) {
+            ((SchemaAwareCompressionOutputStream)deflateFilter).
+              setColumnIndex(columnIndex);
+          }
+          deflateFilter.resetState();
+          deflateOut.write(columnValue.getData(), 0, columnValue.getLength());
+          deflateOut.flush();
+          deflateFilter.finish();
+          // find how much compressed data was added for this column
+          colLen = valueBuffer.getLength() - valueLength;
         } else {
-          int colLen = columnValuePlainLength[columnIndex];
-          key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, colLen,
-              columnIndex);
-          value.setColumnValueBuffer(columnValue, columnIndex);
-          valueLength += colLen;
-          plainTotalColumnLength[columnIndex] += colLen;
-          comprTotalColumnLength[columnIndex] += colLen;
+          colLen = columnValuePlainLength[columnIndex];
         }
+        valueLength += colLen;
+        key.setColumnLenInfo(colLen, currentBuf.valLenBuffer, plainLen,
+          columnIndex);
+        plainTotalColumnLength[columnIndex] += plainLen;
+        comprTotalColumnLength[columnIndex] += colLen;
         columnValuePlainLength[columnIndex] = 0;
       }
 
@@ -968,25 +956,22 @@ public class RCFile {
       if (keyLength < 0) {
         throw new IOException("negative length keys not allowed: " + key);
       }
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
 
-      // Write the record out
-      checkAndWriteSync(); // sync
-      out.writeInt(keyLength + valueLength); // total record length
-      out.writeInt(keyLength); // key portion length
-      if (!isCompressed()) {
-        out.writeInt(keyLength);
-        key.write(out); // key
+      // Write the key out
+      writeKey(key, keyLength + valueLength, keyLength);
+      // write the value out
+      if (isCompressed) {
+        out.write(valueBuffer.getData(), 0, valueBuffer.getLength());
       } else {
-        keyCompressionBuffer.reset();
-        keyDeflateFilter.resetState();
-        key.write(keyDeflateOut);
-        keyDeflateOut.flush();
-        keyDeflateFilter.finish();
-        int compressedKeyLen = keyCompressionBuffer.getLength();
-        out.writeInt(compressedKeyLen);
-        out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+        for(int columnIndex=0; columnIndex < columnNumber; ++columnIndex) {
+          NonSyncDataOutputBuffer buf =
+            columnBuffers[columnIndex].columnValBuffer;
+          out.write(buf.getData(), 0, buf.getLength());
+        }
       }
-      value.write(out); // value
 
       // clear the columnBuffers
       clearColumnBuffers();
@@ -999,27 +984,38 @@ public class RCFile {
      * flush a block out without doing anything except compressing the key part.
      */
     public void flushBlock(KeyBuffer keyBuffer, ValueBuffer valueBuffer,
-        int recordLen, int keyLength, int compressedKeyLen) throws IOException {
+        int recordLen, int keyLength,
+        @SuppressWarnings("unused") int compressedKeyLen) throws IOException {
+      writeKey(keyBuffer, recordLen, keyLength);
+      valueBuffer.write(out);
+    }
+
+    private void writeKey(KeyBuffer keyBuffer, int recordLen,
+                          int keyLength) throws IOException {
       checkAndWriteSync(); // sync
       out.writeInt(recordLen); // total record length
       out.writeInt(keyLength); // key portion length
 
       if(this.isCompressed()) {
+        Compressor compressor = CodecPool.getCompressor(codec);
+        NonSyncDataOutputBuffer compressionBuffer =
+          new NonSyncDataOutputBuffer();
+        CompressionOutputStream deflateFilter =
+          codec.createOutputStream(compressionBuffer, compressor);
+        DataOutputStream deflateOut = new DataOutputStream(deflateFilter);
         //compress key and write key out
-        keyCompressionBuffer.reset();
-        keyDeflateFilter.resetState();
-        keyBuffer.write(keyDeflateOut);
-        keyDeflateOut.flush();
-        keyDeflateFilter.finish();
-        compressedKeyLen = keyCompressionBuffer.getLength();
+        compressionBuffer.reset();
+        deflateFilter.resetState();
+        keyBuffer.write(deflateOut);
+        deflateOut.flush();
+        deflateFilter.finish();
+        int compressedKeyLen = compressionBuffer.getLength();
         out.writeInt(compressedKeyLen);
-        out.write(keyCompressionBuffer.getData(), 0, compressedKeyLen);
+        out.write(compressionBuffer.getData(), 0, compressedKeyLen);
       } else {
-        out.writeInt(compressedKeyLen);
+        out.writeInt(keyLength);
         keyBuffer.write(out);
       }
-
-      valueBuffer.write(out); // value
     }
 
     private void clearColumnBuffers() throws IOException {
@@ -1034,19 +1030,6 @@ public class RCFile {
       }
       clearColumnBuffers();
 
-      if (isCompressed()) {
-        for (int i = 0; i < columnNumber; i++) {
-          deflateFilter[i].close();
-          IOUtils.closeStream(deflateOut[i]);
-        }
-        keyDeflateFilter.close();
-        IOUtils.closeStream(keyDeflateOut);
-        CodecPool.returnCompressor(keyCompressor);
-        keyCompressor = null;
-        CodecPool.returnCompressor(compressor);
-        compressor = null;
-      }
-
       if (out != null) {
 
         // Close the underlying stream if we own it...
@@ -1055,8 +1038,9 @@ public class RCFile {
         out = null;
       }
       for (int i = 0; i < columnNumber; i++) {
-        LOG.info("Column#" + i + " : Plain Total Column Value Length: " + plainTotalColumnLength[i]
-              + ",  Compr Total Column Value Length: " + comprTotalColumnLength[i]);
+        LOG.info("Column#" + i + " : Plain Total Column Value Length: "
+          + plainTotalColumnLength[i]
+          + ",  Compr Total Column Value Length: " + comprTotalColumnLength[i]);
       }
     }
   }
@@ -1094,8 +1078,6 @@ public class RCFile {
 
     private final ValueBuffer currentValue;
 
-    private final boolean[] skippedColIDs = null;
-
     private int readRowsIndexInBuffer = 0;
 
     private int recordsNumInValBuffer = 0;
@@ -1189,7 +1171,7 @@ public class RCFile {
       }
 
       loadColumnNum = columnNumber;
-      if (skippedColIDs != null && skippedColIDs.length > 0) {
+      if (skippedColIDs.length > 0) {
         for (boolean skippedColID : skippedColIDs) {
           if (skippedColID) {
             loadColumnNum -= 1;
@@ -1302,8 +1284,7 @@ public class RCFile {
         try {
           Class<? extends CompressionCodec> codecClass = conf.getClassByName(
               codecClassname).asSubclass(CompressionCodec.class);
-          codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass,
-              conf);
+          codec = ReflectionUtils.newInstance(codecClass, conf);
         } catch (ClassNotFoundException cnfe) {
           throw new IllegalArgumentException(
               "Unknown codec: " + codecClassname, cnfe);
@@ -1526,7 +1507,7 @@ public class RCFile {
      * {@link #next(LongWritable)} and
      * {@link #getCurrentRow(BytesRefArrayWritable)}.
      *
-     * @param columnID
+     * @param columnID the number of the column to get 0 to N-1
      * @throws IOException
      */
     public BytesRefArrayWritable getColumn(int columnID,
@@ -1584,6 +1565,8 @@ public class RCFile {
      * @return whether there still has records or not
      * @throws IOException
      */
+    @SuppressWarnings("unused")
+    @Deprecated
     public synchronized boolean nextColumnsBatch() throws IOException {
       passedRowsNum += (recordsNumInValBuffer - readRowsIndexInBuffer);
       return nextKeyBuffer() > 0;
@@ -1620,15 +1603,12 @@ public class RCFile {
           eof.printStackTrace();
         }
       }
-      if (ret > 0) {
-        return next(readRows);
-      }
-      return false;
+      return (ret > 0) && next(readRows);
     }
 
     private int nextKeyValueTolerateCorruptions() throws IOException {
       long currentOffset = in.getPos();
-      int ret = -1;
+      int ret;
       try {
         ret = nextKeyBuffer();
         this.currentValueBuffer();
@@ -1747,6 +1727,7 @@ public class RCFile {
     }
 
     /** Returns true iff the previous call to next passed a sync mark. */
+    @SuppressWarnings("unused")
     public boolean syncSeen() {
       return syncSeen;
     }
@@ -1762,6 +1743,7 @@ public class RCFile {
       return file.toString();
     }
 
+    @SuppressWarnings("unused")
     public boolean isCompressedRCFile() {
       return this.decompress;
     }