You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/11/02 20:09:27 UTC

svn commit: r832062 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/ serde/src/java/org/apache/hadoop/hive/serde2/columnar/

Author: namit
Date: Mon Nov  2 19:09:27 2009
New Revision: 832062

URL: http://svn.apache.org/viewvc?rev=832062&view=rev
Log:
HIVE-819. Add lazy decompress abilitt to rcfile
(He Yongqiang via namit)


Added:
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_lazydecompress.q
    hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_lazydecompress.q.out
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyDecompressionCallback.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=832062&r1=832061&r2=832062&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Nov  2 19:09:27 2009
@@ -231,6 +231,9 @@
     HIVE-796. RCFile results missing columns from UNION ALL
     (He Yongqiang via namit)
 
+    HIVE-819. Add lazy decompress abilitt to rcfile
+    (He Yongqiang via namit)
+
 Release 0.4.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=832062&r1=832061&r2=832062&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Mon Nov  2 19:09:27 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hive.ql.io.CodecPool;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.LazyDecompressionCallback;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -54,7 +55,6 @@
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * <code>RCFile</code>s, short of Record Columnar File, are flat files
@@ -291,8 +291,41 @@
    * </ul>
    */
   static class ValueBuffer implements Writable {
+    
+    class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
+      
+      int index = -1;
+      int colIndex = -1;
+      
+      public LazyDecompressionCallbackImpl(int index, int colIndex) {
+        super();
+        this.index = index;
+        this.colIndex = colIndex;
+      }
+
+      @Override
+      public byte[] decompress() throws IOException {
+        
+        if (decompressedFlag[index] || codec == null)
+          return loadedColumnsValueBuffer[index].getData();
+        
+        NonSyncDataOutputBuffer compressedData = loadedColumnsValueBuffer[index];
+        NonSyncDataOutputBuffer decompressedData = new NonSyncDataOutputBuffer();
+        decompressBuffer.reset();
+        DataInputStream valueIn = new DataInputStream(deflatFilter);
+        deflatFilter.resetState();
+        decompressBuffer.reset(compressedData.getData(), keyBuffer.eachColumnValueLen[colIndex]);
+        decompressedData.write(valueIn, keyBuffer.eachColumnUncompressedValueLen[colIndex]);
+        loadedColumnsValueBuffer[index] = decompressedData;
+        decompressedFlag[index] = true;
+        return decompressedData.getData();
+      }
+    }
+    
     // used to load columns' value into memory
     private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
+    private boolean[] decompressedFlag = null;
+    private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null; 
 
     boolean inited = false;
 
@@ -339,6 +372,8 @@
             skipped++;
       }
       loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber - skipped];
+      decompressedFlag = new boolean[columnNumber - skipped];
+      lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber - skipped];
       this.codec = codec;
       if (codec != null) {
         valDecompressor = CodecPool.getDecompressor(codec);
@@ -350,6 +385,12 @@
         if (skippedColIDs[k])
           continue;
         loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
+        if(codec != null) {
+          decompressedFlag[readIndex] = false;
+          lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(readIndex, k);
+        } else {
+          decompressedFlag[readIndex] = true;
+        }
         readIndex++;
       }
     }
@@ -358,8 +399,6 @@
       loadedColumnsValueBuffer[addIndex] = valBuffer;
     }
 
-    NonSyncDataOutputBuffer compressedData = new NonSyncDataOutputBuffer();
-
     @Override
     public void readFields(DataInput in) throws IOException {
       int addIndex = 0;
@@ -379,17 +418,9 @@
 
         NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
         valBuf.reset();
-        if (codec != null) {
-          decompressBuffer.reset();
-          DataInputStream valueIn = new DataInputStream(deflatFilter);
-          deflatFilter.resetState();
-          compressedData.reset();
-          compressedData.write(in, vaRowsLen);
-          decompressBuffer.reset(compressedData.getData(), vaRowsLen);
-          valBuf.write(valueIn, keyBuffer.eachColumnUncompressedValueLen[i]);
-        } else {
-          valBuf.write(in, vaRowsLen);
-        }
+        valBuf.write(in, vaRowsLen);
+        if(codec != null)
+          decompressedFlag[addIndex] = false;
         addIndex++;
       }
 
@@ -1199,8 +1230,10 @@
         int length = WritableUtils.readVInt(fetchColumnTempBuf);
 
         BytesRefWritable currentCell = rest.get(i);
-        currentCell.set(currentValue.loadedColumnsValueBuffer[columnID]
-            .getData(), columnNextRowStart, length);
+        if (currentValue.decompressedFlag[columnID])
+          currentCell.set(currentValue.loadedColumnsValueBuffer[columnID].getData(), columnNextRowStart, length);
+        else
+          currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID], columnNextRowStart, length);
         columnNextRowStart = columnNextRowStart + length;
       }
       return rest;
@@ -1285,8 +1318,10 @@
         int length = (int) WritableUtils.readVLong(colValLenBufferReadIn[i]);
         columnRowReadIndex[i] = columnCurrentRowStart + length;
 
-        ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
-            columnCurrentRowStart, length);
+        if (currentValue.decompressedFlag[j])
+          ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), columnCurrentRowStart, length);
+        else
+          ref.set(currentValue.lazyDecompressCallbackObjs[j], columnCurrentRowStart, length);
       }
       rowFetched = true;
     }

Added: hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_lazydecompress.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_lazydecompress.q?rev=832062&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_lazydecompress.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientpositive/rcfile_lazydecompress.q Mon Nov  2 19:09:27 2009
@@ -0,0 +1,28 @@
+DROP TABLE rcfileTableLazyDecompress;
+CREATE table rcfileTableLazyDecompress (key STRING, value STRING) STORED AS RCFile;
+
+FROM src
+INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10;
+
+SELECT key, value FROM rcfileTableLazyDecompress where key > 238;
+
+SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400;
+
+SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key;
+
+set mapred.output.compress=true;
+set hive.exec.compress.output=true;
+
+FROM src
+INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10;
+
+SELECT key, value FROM rcfileTableLazyDecompress where key > 238;
+
+SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400;
+
+SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key;
+
+set mapred.output.compress=false;
+set hive.exec.compress.output=false;
+
+DROP TABLE rcfileTableLazyDecompress;
\ No newline at end of file

Added: hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_lazydecompress.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_lazydecompress.q.out?rev=832062&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_lazydecompress.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/rcfile_lazydecompress.q.out Mon Nov  2 19:09:27 2009
@@ -0,0 +1,108 @@
+PREHOOK: query: DROP TABLE rcfileTableLazyDecompress
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE rcfileTableLazyDecompress
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: CREATE table rcfileTableLazyDecompress (key STRING, value STRING) STORED AS RCFile
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE table rcfileTableLazyDecompress (key STRING, value STRING) STORED AS RCFile
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@rcfileTableLazyDecompress
+PREHOOK: query: FROM src
+INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@rcfiletablelazydecompress
+POSTHOOK: query: FROM src
+INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@rcfiletablelazydecompress
+PREHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfiletablelazydecompress
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/1560474607/10000
+POSTHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfiletablelazydecompress
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/1560474607/10000
+311	val_311
+409	val_409
+255	val_255
+278	val_278
+484	val_484
+PREHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfiletablelazydecompress
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/664979983/10000
+POSTHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfiletablelazydecompress
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/664979983/10000
+311	val_311
+255	val_255
+278	val_278
+PREHOOK: query: SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfiletablelazydecompress
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/364436641/10000
+POSTHOOK: query: SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfiletablelazydecompress
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/364436641/10000
+255	1
+278	1
+311	1
+409	1
+484	1
+PREHOOK: query: FROM src
+INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@rcfiletablelazydecompress
+POSTHOOK: query: FROM src
+INSERT OVERWRITE TABLE rcfileTableLazyDecompress SELECT src.key, src.value LIMIT 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@rcfiletablelazydecompress
+PREHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfiletablelazydecompress
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/1876736453/10000
+POSTHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfiletablelazydecompress
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/1876736453/10000
+311	val_311
+409	val_409
+255	val_255
+278	val_278
+484	val_484
+PREHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfiletablelazydecompress
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/2041315134/10000
+POSTHOOK: query: SELECT key, value FROM rcfileTableLazyDecompress where key > 238 and key < 400
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfiletablelazydecompress
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/2041315134/10000
+311	val_311
+255	val_255
+278	val_278
+PREHOOK: query: SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@rcfiletablelazydecompress
+PREHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/798009292/10000
+POSTHOOK: query: SELECT key, count(1) FROM rcfileTableLazyDecompress where key > 238 group by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@rcfiletablelazydecompress
+POSTHOOK: Output: file:/Users/heyongqiang/Documents/workspace/Hive-Test/build/ql/tmp/798009292/10000
+255	1
+278	1
+311	1
+409	1
+484	1
+PREHOOK: query: DROP TABLE rcfileTableLazyDecompress
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: DROP TABLE rcfileTableLazyDecompress
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: default@rcfiletablelazydecompress

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java?rev=832062&r1=832061&r2=832062&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/BytesRefWritable.java Mon Nov  2 19:09:27 2009
@@ -39,6 +39,9 @@
   int start = 0;
   int length = 0;
   byte[] bytes = null;
+  
+  LazyDecompressionCallback lazyDecompressObj;
+  
 
   /**
    * Create a zero-size bytes.
@@ -75,13 +78,34 @@
     start = offset;
     length = len;
   }
+  
+  /**
+   * Create a BytesRefWritable referenced to one section of the given bytes. The
+   * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback
+   * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to
+   * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and
+   * <tt>len</tt> after uncompressing the data.
+   */
+  public BytesRefWritable(LazyDecompressionCallback lazyDecompressData, int offset, int len) {
+    lazyDecompressObj = lazyDecompressData;
+    start = offset;
+    length = len;
+  }
+
+  private void lazyDecompress() throws IOException {
+    if (bytes == null && lazyDecompressObj != null) {
+      bytes = lazyDecompressObj.decompress();
+    }
+  }
 
   /**
    * Returns a copy of the underlying bytes referenced by this instance.
    * 
    * @return a new copied byte array
+   * @throws IOException 
    */
-  public byte[] getBytesCopy() {
+  public byte[] getBytesCopy() throws IOException {
+    lazyDecompress();
     byte[] bb = new byte[length];
     System.arraycopy(bytes, start, bb, 0, length);
     return bb;
@@ -89,8 +113,10 @@
 
   /**
    * Returns the underlying bytes.
+   * @throws IOException 
    */
-  public byte[] getData() {
+  public byte[] getData() throws IOException {
+    lazyDecompress();
     return bytes;
   }
 
@@ -104,9 +130,24 @@
     bytes = newData;
     start = offset;
     length = len;
+    lazyDecompressObj = null;
+  }
+  
+  /**
+   * readFields() will corrupt the array. So use the set method whenever
+   * possible.
+   * 
+   * @see #readFields(DataInput)
+   */
+  public void set(LazyDecompressionCallback newData, int offset, int len) {
+    bytes = null;
+    start = offset;
+    length = len;
+    lazyDecompressObj = newData;
   }
 
   public void writeDataTo(DataOutput out) throws IOException {
+    lazyDecompress();
     out.write(bytes, start, length);
   }
 
@@ -129,6 +170,7 @@
 
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
+    lazyDecompress();
     out.writeInt(length);
     out.write(bytes, start, length);
   }
@@ -163,8 +205,12 @@
       throw new IllegalArgumentException("Argument can not be null.");
     if (this == other)
       return 0;
-    return WritableComparator.compareBytes(getData(), start, getLength(), other
-        .getData(), other.start, other.getLength());
+    try {
+      return WritableComparator.compareBytes(getData(), start, getLength(), other
+          .getData(), other.start, other.getLength());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /** {@inheritDoc} */

Modified: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=832062&r1=832061&r2=832062&view=diff
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Mon Nov  2 19:09:27 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.serde2.columnar;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -63,11 +64,14 @@
     int num = fieldRefs.size();
     fields = new LazyObject[num];
     cachedByteArrayRef = new ByteArrayRef[num];
+    rawBytesField = new BytesRefWritable[num];
     fieldIsNull = new boolean[num];
+    inited = new boolean[num];
     for (int i = 0; i < num; i++) {
       fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector());
       cachedByteArrayRef[i] = new ByteArrayRef();
       fieldIsNull[i] = false;
+      inited[i] = false;
     }
   }
 
@@ -98,6 +102,8 @@
    * the byte copy.
    */
   ByteArrayRef[] cachedByteArrayRef = null;
+  BytesRefWritable[] rawBytesField = null;
+  boolean[] inited = null;
   boolean[] fieldIsNull = null;
 
   /**
@@ -113,9 +119,22 @@
   protected Object uncheckedGetField(int fieldID, Text nullSequence) {
     if (fieldIsNull[fieldID])
       return null;
-    int fieldLen = cachedByteArrayRef[fieldID].getData().length;
+    if (!inited[fieldID]) {
+      BytesRefWritable passedInField = rawBytesField[fieldID];
+      try {
+        cachedByteArrayRef[fieldID].setData(passedInField.getData());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      fields[fieldID].init(cachedByteArrayRef[fieldID], passedInField
+          .getStart(), passedInField.getLength());
+      inited[fieldID] = true;
+    }
+    
+    byte[] data = cachedByteArrayRef[fieldID].getData();
+    int fieldLen = data.length;
     if (fieldLen == nullSequence.getLength()
-        && LazyUtils.compare(cachedByteArrayRef[fieldID].getData(), 0,
+        && LazyUtils.compare(data, 0,
             fieldLen, nullSequence.getBytes(), 0, nullSequence.getLength()) == 0) {
       return null;
     }
@@ -134,11 +153,8 @@
     if (initialized) { // short cut for non-first calls
       for (int i = 0; i < prjColIDs.length; ++i ) {
         int fieldIndex = prjColIDs[i];
-        BytesRefWritable passedInField = cols.unCheckedGet(fieldIndex);
-        cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
-        fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], 
-                                passedInField.getStart(), 
-                                passedInField.getLength());
+        rawBytesField[fieldIndex] = cols.unCheckedGet(fieldIndex);
+        inited[fieldIndex] = false;
       }
     } else { // first time call init()
       int fieldIndex = 0;
@@ -157,13 +173,12 @@
           // fields[fieldIndex] = LazyFactory.createLazyObject(fieldTypeInfos
           // .get(fieldIndex));
           tmp_sel_cols.add(fieldIndex);
-          cachedByteArrayRef[fieldIndex].setData(passedInField.getData());
-          fields[fieldIndex].init(cachedByteArrayRef[fieldIndex], 
-                                  passedInField.getStart(), 
-                                  passedInField.getLength());
+          rawBytesField[fieldIndex] = passedInField;
           fieldIsNull[fieldIndex] = false;
         } else
           fieldIsNull[fieldIndex] = true;
+        
+        inited[fieldIndex] = false;
       }
       for (; fieldIndex < fields.length; fieldIndex++)
         fieldIsNull[fieldIndex] = true;

Added: hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyDecompressionCallback.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyDecompressionCallback.java?rev=832062&view=auto
==============================================================================
--- hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyDecompressionCallback.java (added)
+++ hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyDecompressionCallback.java Mon Nov  2 19:09:27 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.serde2.columnar;
+
+import java.io.IOException;
+
+
+/**
+ * Used to call back lazy decompression process. 
+ * 
+ * @see #BytesRefWritable
+ */
+public interface LazyDecompressionCallback {
+
+  public byte[] decompress() throws IOException;
+
+}