You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/23 22:41:01 UTC

svn commit: r1525692 [8/8] - in /hive/branches/vectorization: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/test/results/clientpositive/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/results/positive/ h...

Modified: hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml (original)
+++ hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml Mon Sep 23 20:40:54 2013
@@ -163,6 +163,9 @@
                           </void> 
                          </object> 
                         </void> 
+                        <void property="conf"> 
+                         <object class="org.apache.hadoop.hive.ql.plan.TableScanDesc"/> 
+                        </void> 
                         <void property="counterNames"> 
                          <object class="java.util.ArrayList"> 
                           <void method="add"> 
@@ -179,6 +182,26 @@
                           </void> 
                          </object> 
                         </void> 
+                        <void property="neededColumnIDs"> 
+                         <object class="java.util.ArrayList"> 
+                          <void method="add"> 
+                           <int>0</int> 
+                          </void> 
+                          <void method="add"> 
+                           <int>1</int> 
+                          </void> 
+                         </object> 
+                        </void> 
+                        <void property="neededColumns"> 
+                         <object class="java.util.ArrayList"> 
+                          <void method="add"> 
+                           <string>_col0</string> 
+                          </void> 
+                          <void method="add"> 
+                           <string>_col1</string> 
+                          </void> 
+                         </object> 
+                        </void> 
                         <void property="operatorId"> 
                          <string>TS_11</string> 
                         </void> 

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Mon Sep 23 20:40:54 2013
@@ -32,87 +32,57 @@ public final class ColumnProjectionUtils
 
   public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
   public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names";
+  private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = "";
+  private static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns";
+  private static final boolean READ_ALL_COLUMNS_DEFAULT = true;
 
   /**
-   * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
-   * is included in the list, RCFile's reader will not skip its value.
-   * 
+   * Sets the <em>READ_ALL_COLUMNS</em> flag and removes any previously
+   * set column ids.
    */
-  public static void setReadColumnIDs(Configuration conf, List<Integer> ids) {
-    String id = toReadColumnIDString(ids);
-    setReadColumnIDConf(conf, id);
+  public static void setReadAllColumns(Configuration conf) {
+    conf.setBoolean(READ_ALL_COLUMNS, true);
+    setReadColumnIDConf(conf, READ_COLUMN_IDS_CONF_STR_DEFAULT);
   }
 
   /**
-   * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
-   * is included in the list, RCFile's reader will not skip its value.
-   * 
+   * Returns the <em>READ_ALL_COLUMNS</em> columns flag.
    */
-  public static void appendReadColumnIDs(Configuration conf, List<Integer> ids) {
-    String id = toReadColumnIDString(ids);
-    if (id != null) {
-      String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
-      String newConfStr = id;
-      if (old != null) {
-        newConfStr = newConfStr + StringUtils.COMMA_STR + old;
-      }
-
-      setReadColumnIDConf(conf, newConfStr);
-    }
-  }
-
-  public static void appendReadColumnNames(Configuration conf,
-                                           List<String> cols) {
-    if (cols != null) {
-      String old = conf.get(READ_COLUMN_NAMES_CONF_STR, "");
-      StringBuilder result = new StringBuilder(old);
-      boolean first = old.isEmpty();
-      for(String col: cols) {
-        if (first) {
-          first = false;
-        } else {
-          result.append(',');
-        }
-        result.append(col);
-      }
-      conf.set(READ_COLUMN_NAMES_CONF_STR, result.toString());
-    }
+  public static boolean isReadAllColumns(Configuration conf) {
+    return conf.getBoolean(READ_ALL_COLUMNS, READ_ALL_COLUMNS_DEFAULT);
   }
 
-  private static void setReadColumnIDConf(Configuration conf, String id) {
-    if (id == null || id.length() <= 0) {
-      conf.set(READ_COLUMN_IDS_CONF_STR, "");
-      return;
-    }
-
-    conf.set(READ_COLUMN_IDS_CONF_STR, id);
+  /**
+   * Appends read columns' ids (start from zero). Once a column
+   * is included in the list, a underlying record reader of a columnar file format
+   * (e.g. RCFile and ORC) can know what columns are needed.
+   */
+  public static void appendReadColumns(Configuration conf, List<Integer> ids) {
+    String id = toReadColumnIDString(ids);
+    String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
+    String newConfStr = id;
+    if (old != null) {
+      newConfStr = newConfStr + StringUtils.COMMA_STR + old;
+    }
+    setReadColumnIDConf(conf, newConfStr);
+    // Set READ_ALL_COLUMNS to false
+    conf.setBoolean(READ_ALL_COLUMNS, false);
   }
 
-  private static String toReadColumnIDString(List<Integer> ids) {
-    String id = null;
-    if (ids != null) {
-      for (int i = 0; i < ids.size(); i++) {
-        if (i == 0) {
-          id = "" + ids.get(i);
-        } else {
-          id = id + StringUtils.COMMA_STR + ids.get(i);
-        }
-      }
-    }
-    return id;
+  public static void appendReadColumns(
+      Configuration conf, List<Integer> ids, List<String> names) {
+    appendReadColumns(conf, ids);
+    appendReadColumnNames(conf, names);
   }
 
   /**
    * Returns an array of column ids(start from zero) which is set in the given
    * parameter <tt>conf</tt>.
    */
-  public static ArrayList<Integer> getReadColumnIDs(Configuration conf) {
-    if (conf == null) {
-      return new ArrayList<Integer>(0);
-    }
-    String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
+  public static List<Integer> getReadColumnIDs(Configuration conf) {
+    String skips = conf.get(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);
     String[] list = StringUtils.split(skips);
-    ArrayList<Integer> result = new ArrayList<Integer>(list.length);
+    List<Integer> result = new ArrayList<Integer>(list.length);
     for (String element : list) {
       // it may contain duplicates, remove duplicates
       Integer toAdd = Integer.parseInt(element);
@@ -123,11 +93,39 @@ public final class ColumnProjectionUtils
     return result;
   }
 
-  /**
-   * Clears the read column ids set in the conf, and will read all columns.
-   */
-  public static void setFullyReadColumns(Configuration conf) {
-    conf.set(READ_COLUMN_IDS_CONF_STR, "");
+  private static void setReadColumnIDConf(Configuration conf, String id) {
+    if (id.trim().isEmpty()) {
+      conf.set(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT);
+    } else {
+      conf.set(READ_COLUMN_IDS_CONF_STR, id);
+    }
+  }
+
+  private static void appendReadColumnNames(Configuration conf, List<String> cols) {
+    String old = conf.get(READ_COLUMN_NAMES_CONF_STR, "");
+    StringBuilder result = new StringBuilder(old);
+    boolean first = old.isEmpty();
+    for(String col: cols) {
+      if (first) {
+        first = false;
+      } else {
+        result.append(',');
+      }
+      result.append(col);
+    }
+    conf.set(READ_COLUMN_NAMES_CONF_STR, result.toString());
+  }
+
+  private static String toReadColumnIDString(List<Integer> ids) {
+    String id = "";
+    for (int i = 0; i < ids.size(); i++) {
+      if (i == 0) {
+        id = id + ids.get(i);
+      } else {
+        id = id + StringUtils.COMMA_STR + ids.get(i);
+      }
+    }
+    return id;
   }
 
   private ColumnProjectionUtils() {

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java Mon Sep 23 20:40:54 2013
@@ -21,8 +21,10 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.rmi.server.UID;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -52,6 +54,20 @@ import org.apache.hadoop.io.Writable;
 class AvroDeserializer {
   private static final Log LOG = LogFactory.getLog(AvroDeserializer.class);
   /**
+   * Set of already seen and valid record readers IDs which doesn't need re-encoding
+   */
+  private final HashSet<UID> noEncodingNeeded = new HashSet<UID>();
+  /**
+   * Map of record reader ID and the associated re-encoder. It contains only the record readers
+   *  that record needs to be re-encoded.
+   */
+  private final HashMap<UID, SchemaReEncoder> reEncoderCache = new HashMap<UID, SchemaReEncoder>();
+  /**
+   * Flag to print the re-encoding warning message only once. Avoid excessive logging for each
+   * record encoding.
+   */
+  private static boolean warnedOnce = false;
+  /**
    * When encountering a record with an older schema than the one we're trying
    * to read, it is necessary to re-encode with a reader against the newer schema.
    * Because Hive doesn't provide a way to pass extra information to the
@@ -64,16 +80,15 @@ class AvroDeserializer {
     private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
     private BinaryDecoder binaryDecoder = null;
-    private final InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>> gdrCache
-        = new InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>>() {
-            @Override
-            protected GenericDatumReader<GenericRecord> makeInstance(ReaderWriterSchemaPair hv) {
-              return new GenericDatumReader<GenericRecord>(hv.getWriter(), hv.getReader());
-            }
-          };
 
-    public GenericRecord reencode(GenericRecord r, Schema readerSchema)
-            throws AvroSerdeException {
+    GenericDatumReader<GenericRecord> gdr = null;
+
+    public SchemaReEncoder(Schema writer, Schema reader) {
+      gdr = new GenericDatumReader<GenericRecord>(writer, reader);
+    }
+
+    public GenericRecord reencode(GenericRecord r)
+        throws AvroSerdeException {
       baos.reset();
 
       BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(baos, null);
@@ -84,8 +99,6 @@ class AvroDeserializer {
 
         binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder);
 
-        ReaderWriterSchemaPair pair = new ReaderWriterSchemaPair(r.getSchema(), readerSchema);
-        GenericDatumReader<GenericRecord> gdr = gdrCache.retrieve(pair);
         return gdr.read(r, binaryDecoder);
 
       } catch (IOException e) {
@@ -95,7 +108,6 @@ class AvroDeserializer {
   }
 
   private List<Object> row;
-  private SchemaReEncoder reEncoder;
 
   /**
    * Deserialize an Avro record, recursing into its component fields and
@@ -127,14 +139,31 @@ class AvroDeserializer {
     AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable;
     GenericRecord r = recordWritable.getRecord();
 
-    // Check if we're working with an evolved schema
-    if(!r.getSchema().equals(readerSchema)) {
-      LOG.warn("Received different schemas.  Have to re-encode: " +
-              r.getSchema().toString(false));
-      if(reEncoder == null) {
-        reEncoder = new SchemaReEncoder();
+   UID recordReaderId = recordWritable.getRecordReaderID();
+   //If the record reader (from which the record is originated) is already seen and valid,
+    //no need to re-encode the record.
+    if(!noEncodingNeeded.contains(recordReaderId)) {
+      SchemaReEncoder reEncoder = null;
+      //Check if the record record is already encoded once. If it does
+      //reuse the encoder.
+      if(reEncoderCache.containsKey(recordReaderId)) {
+        reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder
+      } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema?
+        //Create and store new encoder in the map for re-use
+        reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema);
+        reEncoderCache.put(recordReaderId, reEncoder);
+      } else{
+        LOG.info("Adding new valid RRID :" +  recordReaderId);
+        noEncodingNeeded.add(recordReaderId);
+      }
+      if(reEncoder != null) {
+        if (!warnedOnce) {
+          LOG.warn("Received different schemas.  Have to re-encode: " +
+              r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId);
+          warnedOnce = true;
+        }
+        r = reEncoder.reencode(r);
       }
-      r = reEncoder.reencode(r, readerSchema);
     }
 
     workerBase(row, columnNames, columnTypes, r);
@@ -288,4 +317,13 @@ class AvroDeserializer {
 
     return map;
   }
+
+  public HashSet<UID> getNoEncodingNeeded() {
+    return noEncodingNeeded;
+  }
+
+  public HashMap<UID, SchemaReEncoder> getReEncoderCache() {
+    return reEncoderCache;
+  }
+
 }

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java Mon Sep 23 20:40:54 2013
@@ -17,6 +17,13 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.rmi.server.UID;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -28,12 +35,6 @@ import org.apache.avro.io.DecoderFactory
 import org.apache.avro.io.EncoderFactory;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
 /**
  * Wrapper around an Avro GenericRecord.  Necessary because Hive's deserializer
  * will happily deserialize any object - as long as it's a writable.
@@ -41,6 +42,10 @@ import java.io.InputStream;
 public class AvroGenericRecordWritable implements Writable{
   GenericRecord record;
   private BinaryDecoder binaryDecoder;
+  /**
+   * Unique Id determine which record reader created this record
+   */
+  private UID recordReaderID;
 
   // There are two areas of exploration for optimization here.
   // 1.  We're serializing the schema with every object.  If we assume the schema
@@ -68,6 +73,7 @@ public class AvroGenericRecordWritable i
     // Write schema since we need it to pull the data out. (see point #1 above)
     String schemaString = record.getSchema().toString(false);
     out.writeUTF(schemaString);
+    recordReaderID.write(out);
 
     // Write record to byte buffer
     GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
@@ -80,9 +86,18 @@ public class AvroGenericRecordWritable i
   @Override
   public void readFields(DataInput in) throws IOException {
     Schema schema = Schema.parse(in.readUTF());
+    recordReaderID = UID.read(in);
     record = new GenericData.Record(schema);
     binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder((InputStream) in, binaryDecoder);
     GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>(schema);
     record = gdr.read(record, binaryDecoder);
   }
+
+  public UID getRecordReaderID() {
+    return recordReaderID;
+  }
+
+  public void setRecordReaderID(UID recordReaderID) {
+    this.recordReaderID = recordReaderID;
+  }
 }

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java Mon Sep 23 20:40:54 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.serde2.columnar;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
@@ -78,9 +79,9 @@ public class ColumnarSerDe extends Colum
    * @see SerDe#initialize(Configuration, Properties)
    */
   @Override
-  public void initialize(Configuration job, Properties tbl) throws SerDeException {
+  public void initialize(Configuration conf, Properties tbl) throws SerDeException {
 
-    serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, getClass().getName());
+    serdeParams = LazySimpleSerDe.initSerdeParams(conf, tbl, getClass().getName());
 
     // Create the ObjectInspectors for the fields. Note: Currently
     // ColumnarObject uses same ObjectInpector as LazyStruct
@@ -89,14 +90,20 @@ public class ColumnarSerDe extends Colum
             .getSeparators(), serdeParams.getNullSequence(), serdeParams
             .isEscaped(), serdeParams.getEscapeChar());
 
-    java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(job);
-
-    cachedLazyStruct = new ColumnarStruct(cachedObjectInspector, notSkipIDs,
-        serdeParams.getNullSequence());
-
     int size = serdeParams.getColumnTypes().size();
+    List<Integer> notSkipIDs = new ArrayList<Integer>();
+    if (conf == null || ColumnProjectionUtils.isReadAllColumns(conf)) {
+      for (int i = 0; i < size; i++ ) {
+        notSkipIDs.add(i);
+      }
+    } else {
+      notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
+    }
+    cachedLazyStruct = new ColumnarStruct(
+        cachedObjectInspector, notSkipIDs, serdeParams.getNullSequence());
+
     super.initialize(size);
-    LOG.debug("ColumnarSerDe initialized with: columnNames="
+    LOG.info("ColumnarSerDe initialized with: columnNames="
         + serdeParams.getColumnNames() + " columnTypes="
         + serdeParams.getColumnTypes() + " separator="
         + Arrays.asList(serdeParams.getSeparators()) + " nullstring="

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Mon Sep 23 20:40:54 2013
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.hive.serde2.columnar;
 
-import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,22 +49,10 @@ public class ColumnarStruct extends Colu
    *
    * @param oi
    *          the ObjectInspector representing the type of this LazyStruct.
-   */
-  public ColumnarStruct(ObjectInspector oi) {
-    this(oi, null, null);
-  }
-
-  /**
-   * Construct a ColumnarStruct object with the TypeInfo. It creates the first
-   * level object at the first place
-   *
-   * @param oi
-   *          the ObjectInspector representing the type of this LazyStruct.
    * @param notSkippedColumnIDs
    *          the column ids that should not be skipped
    */
-  public ColumnarStruct(ObjectInspector oi,
-      ArrayList<Integer> notSkippedColumnIDs, Text nullSequence) {
+  public ColumnarStruct(ObjectInspector oi, List<Integer> notSkippedColumnIDs, Text nullSequence) {
     super(oi, notSkippedColumnIDs);
     if (nullSequence != null) {
       this.nullSequence = nullSequence;
@@ -84,7 +72,7 @@ public class ColumnarStruct extends Colu
     }
     return fieldLen;
   }
-  
+
   @Override
   protected LazyObjectBase createLazyObjectBase(ObjectInspector objectInspector) {
     return LazyFactory.createLazyObject(objectInspector);

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java Mon Sep 23 20:40:54 2013
@@ -122,22 +122,13 @@ public abstract class ColumnarStructBase
   private FieldInfo[] fieldInfoList = null;
   private ArrayList<Object> cachedList;
 
-  public ColumnarStructBase(ObjectInspector oi,
-      ArrayList<Integer> notSkippedColumnIDs) {
+  public ColumnarStructBase(ObjectInspector oi, List<Integer> notSkippedColumnIDs) {
     List<? extends StructField> fieldRefs = ((StructObjectInspector) oi)
         .getAllStructFieldRefs();
     int num = fieldRefs.size();
 
     fieldInfoList = new FieldInfo[num];
 
-    // if no columns is set to be skipped, add all columns in
-    // 'notSkippedColumnIDs'
-    if (notSkippedColumnIDs == null || notSkippedColumnIDs.size() == 0) {
-      for (int i = 0; i < num; i++) {
-        notSkippedColumnIDs.add(i);
-      }
-    }
-
     for (int i = 0; i < num; i++) {
       ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
       fieldInfoList[i] = new FieldInfo(

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java Mon Sep 23 20:40:54 2013
@@ -17,22 +17,23 @@
  */
 package org.apache.hadoop.hive.serde2.columnar;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.Writable;
 
@@ -66,9 +67,17 @@ public class LazyBinaryColumnarSerDe ext
 
     cachedObjectInspector = LazyBinaryFactory.createColumnarStructInspector(
         columnNames, columnTypes);
-    java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
-    cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs);
     int size = columnTypes.size();
+    List<Integer> notSkipIDs = new ArrayList<Integer>();
+    if (conf == null || ColumnProjectionUtils.isReadAllColumns(conf)) {
+      for (int i = 0; i < size; i++ ) {
+        notSkipIDs.add(i);
+      }
+    } else {
+      notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
+    }
+    cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs);
+
     super.initialize(size);
   }
 

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java Mon Sep 23 20:40:54 2013
@@ -18,26 +18,22 @@
 
 package org.apache.hadoop.hive.serde2.columnar;
 
-import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
 import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
-import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 
 public class LazyBinaryColumnarStruct extends ColumnarStructBase {
 
-  public LazyBinaryColumnarStruct(ObjectInspector oi, ArrayList<Integer> notSkippedColumnIDs) {
+  public LazyBinaryColumnarStruct(ObjectInspector oi, List<Integer> notSkippedColumnIDs) {
     super(oi, notSkippedColumnIDs);
   }
 
-  static VInt vInt = new LazyBinaryUtils.VInt();
-
   @Override
   protected int getLength(ObjectInspector objectInspector, ByteArrayRef cachedByteArrayRef,
       int start, int length) {
@@ -48,8 +44,8 @@ public class LazyBinaryColumnarStruct ex
     if (category.equals(Category.PRIMITIVE)) {
       PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) objectInspector)
           .getPrimitiveCategory();
-      if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) && 
-            (cachedByteArrayRef.getData()[start] 
+      if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) &&
+            (cachedByteArrayRef.getData()[start]
               == LazyBinaryColumnarSerDe.INVALID_UTF__SINGLE_BYTE[0])) {
         return 0;
       }

Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java Mon Sep 23 20:40:54 2013
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.rmi.server.UID;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
@@ -500,4 +501,64 @@ public class TestAvroDeserializer {
       assertEquals(expected, soi.getPrimitiveJavaObject(rowElement));
     }
   }
+
+  @Test
+  public void verifyCaching() throws SerDeException, IOException {
+    Schema s = Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
+    GenericData.Record record = new GenericData.Record(s);
+    GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema());
+    innerRecord.put("int1", 42);
+    innerRecord.put("boolean1", true);
+    innerRecord.put("long1", 42432234234l);
+    record.put("aRecord", innerRecord);
+    assertTrue(GENERIC_DATA.validate(s, record));
+
+    AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+    UID recordReaderID = new UID();
+    garw.setRecordReaderID(recordReaderID);
+    AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+    AvroDeserializer de = new AvroDeserializer();
+    ArrayList<Object> row =
+        (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+
+    assertEquals(1, de.getNoEncodingNeeded().size());
+    assertEquals(0, de.getReEncoderCache().size());
+
+    // Read the record with the same record reader ID
+    row = (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+
+    //Expecting not to change the size of internal structures
+    assertEquals(1, de.getNoEncodingNeeded().size());
+    assertEquals(0, de.getReEncoderCache().size());
+
+    //Read the record with **different** record reader ID
+    garw.setRecordReaderID(new UID()); //New record reader ID
+    row = (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+
+    //Expecting to change the size of internal structures
+    assertEquals(2, de.getNoEncodingNeeded().size());
+    assertEquals(0, de.getReEncoderCache().size());
+
+  //Read the record with **different** record reader ID and **evolved** schema
+    Schema evolvedSchema = Schema.parse(s.toString());
+    evolvedSchema.getField("aRecord").schema().addProp("Testing", "meaningless");
+    garw.setRecordReaderID(recordReaderID = new UID()); //New record reader ID
+    row =
+            (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema);
+
+    //Expecting to change the size of internal structures
+    assertEquals(2, de.getNoEncodingNeeded().size());
+    assertEquals(1, de.getReEncoderCache().size());
+
+  //Read the record with existing record reader ID and same **evolved** schema
+    garw.setRecordReaderID(recordReaderID); //Reuse record reader ID
+    row =
+            (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema);
+
+    //Expecting NOT to change the size of internal structures
+    assertEquals(2, de.getNoEncodingNeeded().size());
+    assertEquals(1, de.getReEncoderCache().size());
+
+  }
 }

Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java Mon Sep 23 20:40:54 2013
@@ -17,18 +17,19 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.rmi.server.UID;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
 
 public class TestGenericAvroRecordWritable {
   private static final String schemaJSON = "{\n" +
@@ -59,12 +60,14 @@ public class TestGenericAvroRecordWritab
     assertEquals("Doctor", gr.get("last"));
 
     AvroGenericRecordWritable garw = new AvroGenericRecordWritable(gr);
+    garw.setRecordReaderID(new UID());
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream daos = new DataOutputStream(baos);
     garw.write(daos);
 
     AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable(gr);
+    garw2.setRecordReaderID(new UID());
 
     ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
     DataInputStream dais = new DataInputStream(bais);

Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java Mon Sep 23 20:40:54 2013
@@ -17,15 +17,15 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public class TestSchemaReEncoder {
   @Test
   public void schemasCanAddFields() throws SerDeException {
@@ -62,8 +62,8 @@ public class TestSchemaReEncoder {
     GenericRecord record = new GenericData.Record(originalSchema);
     record.put("text", "it is a far better thing I do, yadda, yadda");
     assertTrue(GenericData.get().validate(originalSchema, record));
-    AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder();
-    GenericRecord r2 = schemaReEncoder.reencode(record, evolvedSchema);
+    AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder(record.getSchema(), evolvedSchema);
+    GenericRecord r2 = schemaReEncoder.reencode(record);
 
     assertTrue(GenericData.get().validate(evolvedSchema, r2));
     assertEquals("Hi!", r2.get("new_kid").toString());
@@ -104,7 +104,8 @@ public class TestSchemaReEncoder {
     record.put("a", 19);
     assertTrue(GenericData.get().validate(originalSchema2, record));
 
-    r2 = schemaReEncoder.reencode(record,  evolvedSchema2);
+    schemaReEncoder = new AvroDeserializer.SchemaReEncoder(record.getSchema(), evolvedSchema2);
+    r2 = schemaReEncoder.reencode(record);
     assertTrue(GenericData.get().validate(evolvedSchema2, r2));
     assertEquals(42l, r2.get("b"));
   }

Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java?rev=1525692&r1=1525691&r2=1525692&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java (original)
+++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java Mon Sep 23 20:40:54 2013
@@ -17,13 +17,14 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
-import org.apache.avro.generic.GenericData;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.rmi.server.UID;
+
+import org.apache.avro.generic.GenericData;
 
 class Utils {
   // Force Avro to serialize and de-serialize the record to make sure it has a
@@ -31,6 +32,7 @@ class Utils {
   public static AvroGenericRecordWritable
   serializeAndDeserializeRecord(GenericData.Record record) throws IOException {
     AvroGenericRecordWritable garw = new AvroGenericRecordWritable(record);
+    garw.setRecordReaderID(new UID());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream daos = new DataOutputStream(baos);
     garw.write(daos);
@@ -39,6 +41,7 @@ class Utils {
     DataInputStream dais = new DataInputStream(bais);
 
     AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable();
+    garw2.setRecordReaderID(new UID());
     garw2.readFields(dais);
     return garw2;
   }