You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/01/05 20:03:49 UTC

svn commit: r1649612 - in /hive/trunk: data/files/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/had...

Author: hashutosh
Date: Mon Jan  5 19:03:49 2015
New Revision: 1649612

URL: http://svn.apache.org/r1649612
Log:
HIVE-7977 : Avoid creating serde for partitions if possible in FetchTask (Navis via Ashutosh Chauhan)

Removed:
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedListObjectInspector.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedMapObjectInspector.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedObjectInspectorFactory.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedStructObjectInspector.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/DelegatedUnionObjectInspector.java
Modified:
    hive/trunk/data/files/datatypes.txt
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java

Modified: hive/trunk/data/files/datatypes.txt
URL: http://svn.apache.org/viewvc/hive/trunk/data/files/datatypes.txt?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/data/files/datatypes.txt (original)
+++ hive/trunk/data/files/datatypes.txt Mon Jan  5 19:03:49 2015
@@ -1,3 +1,3 @@
-\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
--1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N
+\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N\N
+-1false-1.1\N\N\N-1-1-1.0-1\N\N\N\N\N\N\N\N\N
 1true1.11121x2ykva92.2111.01abcd1111213142212212x1abcd22012-04-22 09:00:00.123456789123456789.0123456YWJjZA==2013-01-01abc123abc123X'01FF'

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Mon Jan  5 19:03:49 2015
@@ -222,7 +222,7 @@ public class Driver implements CommandPr
         String tableName = "result";
         List<FieldSchema> lst = null;
         try {
-          lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer());
+          lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf));
         } catch (Exception e) {
           LOG.warn("Error getting schema: "
               + org.apache.hadoop.util.StringUtils.stringifyException(e));

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Mon Jan  5 19:03:49 2015
@@ -26,8 +26,10 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.lang3.StringEscapeUtils;
+import com.google.common.collect.Iterators;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
@@ -49,14 +51,13 @@ import org.apache.hadoop.hive.ql.plan.Pa
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -70,74 +71,81 @@ import org.apache.hadoop.mapred.JobConfi
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.AnnotationUtils;
 
 /**
  * FetchTask implementation.
  **/
 public class FetchOperator implements Serializable {
 
-  static Log LOG = LogFactory.getLog(FetchOperator.class.getName());
-  static LogHelper console = new LogHelper(LOG);
+  static final Log LOG = LogFactory.getLog(FetchOperator.class.getName());
+  static final LogHelper console = new LogHelper(LOG);
 
   public static final String FETCH_OPERATOR_DIRECTORY_LIST =
       "hive.complete.dir.list";
 
-  private boolean isNativeTable;
   private FetchWork work;
-  protected Operator<?> operator;    // operator tree for processing row further (option)
-  private int splitNum;
-  private PartitionDesc currPart;
-  private TableDesc currTbl;
-  private boolean tblDataDone;
-  private FooterBuffer footerBuffer = null;
-  private int headerCount = 0;
-  private int footerCount = 0;
+  private Operator<?> operator;    // operator tree for processing row further (optional)
 
-  private boolean hasVC;
-  private boolean isPartitioned;
+  private final boolean hasVC;
+  private final boolean isStatReader;
+  private final boolean isPartitioned;
+  private final boolean isNonNativeTable;
   private StructObjectInspector vcsOI;
   private List<VirtualColumn> vcCols;
   private ExecMapperContext context;
 
+  private transient Deserializer tableSerDe;
+  private transient StructObjectInspector tableOI;
+  private transient StructObjectInspector partKeyOI;
+  private transient StructObjectInspector convertedOI;
+
+  private transient Iterator<Path> iterPath;
+  private transient Iterator<PartitionDesc> iterPartDesc;
+  private transient Iterator<FetchInputFormatSplit> iterSplits = Iterators.emptyIterator();
+
+  private transient Path currPath;
+  private transient PartitionDesc currDesc;
+  private transient Deserializer currSerDe;
+  private transient Converter ObjectConverter;
   private transient RecordReader<WritableComparable, Writable> currRecReader;
-  private transient FetchInputFormatSplit[] inputSplits;
-  private transient InputFormat inputFormat;
+
   private transient JobConf job;
   private transient WritableComparable key;
   private transient Writable value;
   private transient Object[] vcValues;
-  private transient Deserializer serde;
-  private transient Deserializer tblSerde;
-  private transient Converter partTblObjectInspectorConverter;
 
-  private transient Iterator<Path> iterPath;
-  private transient Iterator<PartitionDesc> iterPartDesc;
-  private transient Path currPath;
-  private transient StructObjectInspector objectInspector;
-  private transient StructObjectInspector rowObjectInspector;
-  private transient ObjectInspector partitionedTableOI;
+  private transient int headerCount;
+  private transient int footerCount;
+  private transient FooterBuffer footerBuffer;
+
+  private transient StructObjectInspector outputOI;
   private transient Object[] row;
 
-  public FetchOperator() {
-  }
-
-  public FetchOperator(FetchWork work, JobConf job) {
-    this.job = job;
-    this.work = work;
-    initialize();
+  public FetchOperator(FetchWork work, JobConf job) throws HiveException {
+    this(work, job, null, null);
   }
 
   public FetchOperator(FetchWork work, JobConf job, Operator<?> operator,
-      List<VirtualColumn> vcCols) {
+      List<VirtualColumn> vcCols) throws HiveException {
     this.job = job;
     this.work = work;
     this.operator = operator;
     this.vcCols = vcCols;
+    this.hasVC = vcCols != null && !vcCols.isEmpty();
+    this.isStatReader = work.getTblDesc() == null;
+    this.isPartitioned = !isStatReader && work.isPartitioned();
+    this.isNonNativeTable = !isStatReader && work.getTblDesc().isNonNative();
     initialize();
   }
 
-  private void initialize() {
-    if (hasVC = vcCols != null && !vcCols.isEmpty()) {
+  private void initialize() throws HiveException {
+    if (isStatReader) {
+      outputOI = work.getStatRowOI();
+      return;
+    }
+    if (hasVC) {
       List<String> names = new ArrayList<String>(vcCols.size());
       List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>(vcCols.size());
       for (VirtualColumn vc : vcCols) {
@@ -147,8 +155,6 @@ public class FetchOperator implements Se
       vcsOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
       vcValues = new Object[vcCols.size()];
     }
-    isPartitioned = work.isPartitioned();
-    tblDataDone = false;
     if (hasVC && isPartitioned) {
       row = new Object[3];
     } else if (hasVC || isPartitioned) {
@@ -156,21 +162,27 @@ public class FetchOperator implements Se
     } else {
       row = new Object[1];
     }
-    if (work.getTblDesc() != null) {
-      isNativeTable = !work.getTblDesc().isNonNative();
+    if (isPartitioned) {
+      iterPath = work.getPartDir().iterator();
+      iterPartDesc = work.getPartDesc().iterator();
     } else {
-      isNativeTable = true;
+      iterPath = Arrays.asList(work.getTblDir()).iterator();
+      iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null));
     }
-    setupExecContext();
+    outputOI = setupOutputObjectInspector();
+    context = setupExecContext(operator, work.getPathLists());
   }
 
-  private void setupExecContext() {
+  private ExecMapperContext setupExecContext(Operator operator, List<Path> paths) {
+    ExecMapperContext context = null;
     if (hasVC || work.getSplitSample() != null) {
       context = new ExecMapperContext(job);
       if (operator != null) {
         operator.setExecContext(context);
       }
     }
+    setFetchOperatorContext(job, paths);
+    return context;
   }
 
   public FetchWork getWork() {
@@ -181,42 +193,6 @@ public class FetchOperator implements Se
     this.work = work;
   }
 
-  public int getSplitNum() {
-    return splitNum;
-  }
-
-  public void setSplitNum(int splitNum) {
-    this.splitNum = splitNum;
-  }
-
-  public PartitionDesc getCurrPart() {
-    return currPart;
-  }
-
-  public void setCurrPart(PartitionDesc currPart) {
-    this.currPart = currPart;
-  }
-
-  public TableDesc getCurrTbl() {
-    return currTbl;
-  }
-
-  public void setCurrTbl(TableDesc currTbl) {
-    this.currTbl = currTbl;
-  }
-
-  public boolean isTblDataDone() {
-    return tblDataDone;
-  }
-
-  public void setTblDataDone(boolean tblDataDone) {
-    this.tblDataDone = tblDataDone;
-  }
-
-  public boolean isEmptyTable() {
-    return work.getTblDir() == null && (work.getPartDir() == null || work.getPartDir().isEmpty());
-  }
-
   /**
    * A cache of InputFormat instances.
    */
@@ -243,146 +219,54 @@ public class FetchOperator implements Se
     return format;
   }
 
-  private StructObjectInspector getRowInspectorFromTable(TableDesc table) throws Exception {
-    Deserializer serde = table.getDeserializerClass().newInstance();
-    SerDeUtils.initializeSerDeWithoutErrorCheck(serde, job, table.getProperties(), null);
-    return createRowInspector(getStructOIFrom(serde.getObjectInspector()));
-  }
-
-  private StructObjectInspector getRowInspectorFromPartition(PartitionDesc partition,
-      ObjectInspector partitionOI) throws Exception {
-
-    String pcols = partition.getTableDesc().getProperties().getProperty(
+  private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exception {
+    String pcols = tableDesc.getProperties().getProperty(
         org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-    String[] partKeys = pcols.trim().split("/");
-    String pcolTypes = partition.getTableDesc().getProperties().getProperty(
-        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); 
-    String[] partKeyTypes = pcolTypes.trim().split(":");
-    row[1] = createPartValue(partKeys, partition.getPartSpec(), partKeyTypes);
+    String pcolTypes = tableDesc.getProperties().getProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
 
-    return createRowInspector(getStructOIFrom(partitionOI), partKeys, partKeyTypes);
-  }
-
-  private StructObjectInspector getRowInspectorFromPartitionedTable(TableDesc table)
-      throws Exception {
-    Deserializer serde = table.getDeserializerClass().newInstance();
-    SerDeUtils.initializeSerDe(serde, job, table.getProperties(), null);
-    String pcols = table.getProperties().getProperty(
-        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
     String[] partKeys = pcols.trim().split("/");
-    String pcolTypes = table.getProperties().getProperty(
-        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); 
-    String[] partKeyTypes = pcolTypes.trim().split(":");    
-    row[1] = null;
-    return createRowInspector(getStructOIFrom(serde.getObjectInspector()), partKeys, partKeyTypes);
-  }
-
-  private StructObjectInspector getStructOIFrom(ObjectInspector current) throws SerDeException {
-    if (objectInspector != null) {
-      current = DelegatedObjectInspectorFactory.reset(objectInspector, current);
-    } else {
-      current = DelegatedObjectInspectorFactory.wrap(current);
-    }
-    return objectInspector = (StructObjectInspector) current;
-  }
-
-  private StructObjectInspector createRowInspector(StructObjectInspector current)
-      throws SerDeException {
-    return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector(
-        Arrays.asList(current, vcsOI)) : current;
-  }
-
-  private StructObjectInspector createRowInspector(StructObjectInspector current, String[] partKeys, String[] partKeyTypes)
-      throws SerDeException {
-    List<String> partNames = new ArrayList<String>();
-    List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
+    String[] partKeyTypes = pcolTypes.trim().split(":");
+    ObjectInspector[] inspectors = new ObjectInspector[partKeys.length];
     for (int i = 0; i < partKeys.length; i++) {
-      String key = partKeys[i];
-      partNames.add(key);    
-      ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+      inspectors[i] = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
           TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-      partObjectInspectors.add(oi);
     }
-    StructObjectInspector partObjectInspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
-    return ObjectInspectorFactory.getUnionStructObjectInspector(
-        hasVC ? Arrays.asList(current, partObjectInspector, vcsOI) :
-            Arrays.asList(current, partObjectInspector));
+    return ObjectInspectorFactory.getStandardStructObjectInspector(
+        Arrays.asList(partKeys), Arrays.asList(inspectors));
   }
 
-  private Object[] createPartValue(String[] partKeys, Map<String, String> partSpec, String[] partKeyTypes) {
-    Object[] partValues = new Object[partKeys.length];
-    for (int i = 0; i < partKeys.length; i++) {
-      String key = partKeys[i];
-      ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
-          TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-      partValues[i] = 
-          ObjectInspectorConverters.
-          getConverter(PrimitiveObjectInspectorFactory.
-              javaStringObjectInspector, oi).convert(partSpec.get(key));   
+  private Object[] createPartValue(PartitionDesc partDesc, StructObjectInspector partOI) {
+    Map<String, String> partSpec = partDesc.getPartSpec();
+    List<? extends StructField> fields = partOI.getAllStructFieldRefs();
+    Object[] partValues = new Object[fields.size()];
+    for (int i = 0; i < partValues.length; i++) {
+      StructField field = fields.get(i);
+      String value = partSpec.get(field.getFieldName());
+      ObjectInspector oi = field.getFieldObjectInspector();
+      partValues[i] = ObjectInspectorConverters.getConverter(
+          PrimitiveObjectInspectorFactory.javaStringObjectInspector, oi).convert(value);
     }
     return partValues;
   }
 
-  private void getNextPath() throws Exception {
-    // first time
-    if (iterPath == null) {
-      if (work.isNotPartitioned()) {
-        if (!tblDataDone) {
-          currPath = work.getTblDir();
-          currTbl = work.getTblDesc();
-          if (isNativeTable) {
-            FileSystem fs = currPath.getFileSystem(job);
-            if (fs.exists(currPath)) {
-              FileStatus[] fStats = listStatusUnderPath(fs, currPath);
-              for (FileStatus fStat : fStats) {
-                if (fStat.getLen() > 0) {
-                  tblDataDone = true;
-                  break;
-                }
-              }
-            }
-          } else {
-            tblDataDone = true;
-          }
-
-          if (!tblDataDone) {
-            currPath = null;
-          }
-          return;
-        } else {
-          currTbl = null;
-          currPath = null;
-        }
-        return;
-      } else {
-        setFetchOperatorContext(job, work.getPartDir());
-        iterPath = work.getPartDir().iterator();
-        iterPartDesc = work.getPartDesc().iterator();
-      }
-    }
-
+  private boolean getNextPath() throws Exception {
     while (iterPath.hasNext()) {
-      Path nxt = iterPath.next();
-      PartitionDesc prt = null;
-      if (iterPartDesc != null) {
-        prt = iterPartDesc.next();
-      }
-      FileSystem fs = nxt.getFileSystem(job);
-      if (fs.exists(nxt)) {
-        FileStatus[] fStats = listStatusUnderPath(fs, nxt);
-        for (FileStatus fStat : fStats) {
+      currPath = iterPath.next();
+      currDesc = iterPartDesc.next();
+      if (isNonNativeTable) {
+        return true;
+      }
+      FileSystem fs = currPath.getFileSystem(job);
+      if (fs.exists(currPath)) {
+        for (FileStatus fStat : listStatusUnderPath(fs, currPath)) {
           if (fStat.getLen() > 0) {
-            currPath = nxt;
-            if (iterPartDesc != null) {
-              currPart = prt;
-            }
-            return;
+            return true;
           }
         }
       }
     }
+    return false;
   }
 
   /**
@@ -390,119 +274,53 @@ public class FetchOperator implements Se
    * This helps InputFormats make decisions based on the scope of the complete
    * operation.
    * @param conf the configuration to modify
-   * @param partDirs the list of partition directories
+   * @param paths the list of input directories
    */
-  static void setFetchOperatorContext(JobConf conf,
-                                      ArrayList<Path> partDirs) {
-    if (partDirs != null) {
+  static void setFetchOperatorContext(JobConf conf, List<Path> paths) {
+    if (paths != null) {
       StringBuilder buff = new StringBuilder();
-      boolean first = true;
-      for(Path p: partDirs) {
-        if (first) {
-          first = false;
-        } else {
+      for (Path path : paths) {
+        if (buff.length() > 0) {
           buff.append('\t');
         }
-        buff.append(StringEscapeUtils.escapeJava(p.toString()));
+        buff.append(StringEscapeUtils.escapeJava(path.toString()));
       }
       conf.set(FETCH_OPERATOR_DIRECTORY_LIST, buff.toString());
     }
   }
 
-  /**
-   * A cache of Object Inspector Settable Properties.
-   */
-  private static Map<ObjectInspector, Boolean> oiSettableProperties = new HashMap<ObjectInspector, Boolean>();
-
   private RecordReader<WritableComparable, Writable> getRecordReader() throws Exception {
-    if (currPath == null) {
-      getNextPath();
-      if (currPath == null) {
+    if (!iterSplits.hasNext()) {
+      FetchInputFormatSplit[] splits = getNextSplits();
+      if (splits == null) {
         return null;
       }
-
-      // not using FileInputFormat.setInputPaths() here because it forces a
-      // connection
-      // to the default file system - which may or may not be online during pure
-      // metadata
-      // operations
-      job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath
-          .toString()));
-
-      // Fetch operator is not vectorized and as such turn vectorization flag off so that
-      // non-vectorized record reader is created below.
-      if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
-        HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
-      }
-
-      PartitionDesc partDesc;
-      if (currTbl == null) {
-        partDesc = currPart;
+      if (!isPartitioned || convertedOI == null) {
+        currSerDe = tableSerDe;
+        ObjectConverter = null;
       } else {
-        partDesc = new PartitionDesc(currTbl, null);
-      }
-
-      Class<? extends InputFormat> formatter = partDesc.getInputFileFormatClass();
-      inputFormat = getInputFormatFromCache(formatter, job);
-      Utilities.copyTableJobPropertiesToConf(partDesc.getTableDesc(), job);
-      InputSplit[] splits = inputFormat.getSplits(job, 1);
-      FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
-      for (int i = 0; i < splits.length; i++) {
-        inputSplits[i] = new FetchInputFormatSplit(splits[i], formatter.getName());
+        currSerDe = needConversion(currDesc) ? currDesc.getDeserializer(job) : tableSerDe;
+        ObjectInspector inputOI = currSerDe.getObjectInspector();
+        ObjectConverter = ObjectInspectorConverters.getConverter(inputOI, convertedOI);
       }
-      if (work.getSplitSample() != null) {
-        inputSplits = splitSampling(work.getSplitSample(), inputSplits);
+      if (isPartitioned) {
+        row[1] = createPartValue(currDesc, partKeyOI);
       }
-      this.inputSplits = inputSplits;
-
-      splitNum = 0;
-      serde = partDesc.getDeserializer(job);
-      SerDeUtils.initializeSerDe(serde, job, partDesc.getTableDesc().getProperties(),
-                                 partDesc.getProperties());
-
-      if (currTbl != null) {
-        tblSerde = serde;
-      }
-      else {
-        tblSerde = currPart.getTableDesc().getDeserializerClass().newInstance();
-        SerDeUtils.initializeSerDe(tblSerde, job, currPart.getTableDesc().getProperties(), null);
-      }
-
-      ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI(
-          serde.getObjectInspector(),
-          partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI,
-          oiSettableProperties);
-
-      partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
-          serde.getObjectInspector(), outputOI);
+      iterSplits = Arrays.asList(splits).iterator();
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Creating fetchTask with deserializer typeinfo: "
-            + serde.getObjectInspector().getTypeName());
+            + currSerDe.getObjectInspector().getTypeName());
         LOG.debug("deserializer properties:\ntable properties: " +
-            partDesc.getTableDesc().getProperties() + "\npartition properties: " +
-            partDesc.getProperties());
-      }
-
-      if (currPart != null) {
-        getRowInspectorFromPartition(currPart, outputOI);
-      }
-    }
-
-    if (splitNum >= inputSplits.length) {
-      if (currRecReader != null) {
-        currRecReader.close();
-        currRecReader = null;
+            currDesc.getTableDesc().getProperties() + "\npartition properties: " +
+            currDesc.getProperties());
       }
-      currPath = null;
-      return getRecordReader();
     }
 
-    final FetchInputFormatSplit target = inputSplits[splitNum];
+    final FetchInputFormatSplit target = iterSplits.next();
 
     @SuppressWarnings("unchecked")
-    final RecordReader<WritableComparable, Writable> reader =
-        inputFormat.getRecordReader(target.getInputSplit(), job, Reporter.NULL);
+    final RecordReader<WritableComparable, Writable> reader = target.getRecordReader(job);
     if (hasVC || work.getSplitSample() != null) {
       currRecReader = new HiveRecordReader<WritableComparable, Writable>(reader, job) {
         @Override
@@ -517,23 +335,52 @@ public class FetchOperator implements Se
         }
       };
       ((HiveContextAwareRecordReader)currRecReader).
-          initIOContext(target, job, inputFormat.getClass(), reader);
+          initIOContext(target, job, target.inputFormat.getClass(), reader);
     } else {
       currRecReader = reader;
     }
-    splitNum++;
     key = currRecReader.createKey();
     value = currRecReader.createValue();
+    headerCount = footerCount = 0;
     return currRecReader;
   }
 
+  protected FetchInputFormatSplit[] getNextSplits() throws Exception {
+    while (getNextPath()) {
+      // not using FileInputFormat.setInputPaths() here because it forces a connection to the
+      // default file system - which may or may not be online during pure metadata operations
+      job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString()));
+
+      // Fetch operator is not vectorized and as such turn vectorization flag off so that
+      // non-vectorized record reader is created below.
+      HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+
+      Class<? extends InputFormat> formatter = currDesc.getInputFileFormatClass();
+      Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job);
+      InputFormat inputFormat = getInputFormatFromCache(formatter, job);
+
+      InputSplit[] splits = inputFormat.getSplits(job, 1);
+      FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length];
+      for (int i = 0; i < splits.length; i++) {
+        inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat);
+      }
+      if (work.getSplitSample() != null) {
+        inputSplits = splitSampling(work.getSplitSample(), inputSplits);
+      }
+      if (inputSplits.length > 0) {
+        return inputSplits;
+      }
+    }
+    return null;
+  }
+
   private FetchInputFormatSplit[] splitSampling(SplitSample splitSample,
       FetchInputFormatSplit[] splits) {
     long totalSize = 0;
     for (FetchInputFormatSplit split: splits) {
         totalSize += split.getLength();
     }
-    List<FetchInputFormatSplit> result = new ArrayList<FetchInputFormatSplit>();
+    List<FetchInputFormatSplit> result = new ArrayList<FetchInputFormatSplit>(splits.length);
     long targetSize = splitSample.getTargetSize(totalSize);
     int startIndex = splitSample.getSeedNum() % splits.length;
     long size = 0;
@@ -557,18 +404,18 @@ public class FetchOperator implements Se
    * Currently only used by FetchTask.
    **/
   public boolean pushRow() throws IOException, HiveException {
-    if(work.getRowsComputedUsingStats() != null) {
+    if (work.getRowsComputedUsingStats() != null) {
       for (List<Object> row : work.getRowsComputedUsingStats()) {
         operator.processOp(row, 0);
       }
-      operator.flush();
+      flushRow();
       return true;
     }
     InspectableObject row = getNextRow();
     if (row != null) {
       pushRow(row);
     } else {
-      operator.flush();
+      flushRow();
     }
     return row != null;
   }
@@ -577,6 +424,10 @@ public class FetchOperator implements Se
     operator.processOp(row.o, 0);
   }
 
+  protected void flushRow() throws HiveException {
+    operator.flush();
+  }
+
   private transient final InspectableObject inspectable = new InspectableObject();
 
   /**
@@ -602,28 +453,16 @@ public class FetchOperator implements Se
            * If file contains footer, used FooterBuffer to cache and remove footer
            * records at the end of the file.
            */
-          headerCount = 0;
-          footerCount = 0;
-          TableDesc table = null;
-          if (currTbl != null) {
-            table = currTbl;
-          } else if (currPart != null) {
-            table = currPart.getTableDesc();
-          }
-          if (table != null) {
-            headerCount = Utilities.getHeaderCount(table);
-            footerCount = Utilities.getFooterCount(table, job);
-          }
+          headerCount = Utilities.getHeaderCount(currDesc.getTableDesc());
+          footerCount = Utilities.getFooterCount(currDesc.getTableDesc(), job);
 
           // Skip header lines.
           opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value);
 
           // Initialize footer buffer.
-          if (opNotEOF) {
-            if (footerCount > 0) {
-              footerBuffer = new FooterBuffer();
-              opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value);
-            }
+          if (opNotEOF && footerCount > 0) {
+            footerBuffer = new FooterBuffer();
+            opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key, value);
           }
         }
 
@@ -640,25 +479,24 @@ public class FetchOperator implements Se
         if (opNotEOF) {
           if (operator != null && context != null && context.inputFileChanged()) {
             // The child operators cleanup if input file has changed
-            try {
-              operator.cleanUpInputFileChanged();
-            } catch (HiveException e) {
-              throw new IOException(e);
-            }
+            operator.cleanUpInputFileChanged();
           }
           if (hasVC) {
-            vcValues = MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, serde);
-            row[isPartitioned ? 2 : 1] = vcValues;
+            row[isPartitioned ? 2 : 1] =
+                MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, currSerDe);
+          }
+          Object deserialized = currSerDe.deserialize(value);
+          if (ObjectConverter != null) {
+            deserialized = ObjectConverter.convert(deserialized);
           }
-          row[0] = partTblObjectInspectorConverter.convert(serde.deserialize(value));
 
           if (hasVC || isPartitioned) {
+            row[0] = deserialized;
             inspectable.o = row;
-            inspectable.oi = rowObjectInspector;
-            return inspectable;
+          } else {
+            inspectable.o = deserialized;
           }
-          inspectable.o = row[0];
-          inspectable.oi = tblSerde.getObjectInspector();
+          inspectable.oi = currSerDe.getObjectInspector();
           return inspectable;
         } else {
           currRecReader.close();
@@ -688,13 +526,13 @@ public class FetchOperator implements Se
         context.clear();
         context = null;
       }
-      this.currTbl = null;
       this.currPath = null;
       this.iterPath = null;
       this.iterPartDesc = null;
+      this.iterSplits = Iterators.emptyIterator();
     } catch (Exception e) {
       throw new HiveException("Failed with exception " + e.getMessage()
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
+          + StringUtils.stringifyException(e));
     }
   }
 
@@ -703,25 +541,33 @@ public class FetchOperator implements Se
    */
   public void setupContext(List<Path> paths) {
     this.iterPath = paths.iterator();
-    if (work.isNotPartitioned()) {
-      this.currTbl = work.getTblDesc();
+    List<PartitionDesc> partitionDescs;
+    if (!isPartitioned) {
+      this.iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(), null));
     } else {
       this.iterPartDesc = work.getPartDescs(paths).iterator();
     }
-    setupExecContext();
+    this.context = setupExecContext(operator, paths);
   }
 
   /**
    * returns output ObjectInspector, never null
    */
-  public ObjectInspector getOutputObjectInspector() throws HiveException {
-    if(null != work.getStatRowOI()) {
-      return work.getStatRowOI();
-    }
+  public ObjectInspector getOutputObjectInspector() {
+    return outputOI;
+  }
+
+  private StructObjectInspector setupOutputObjectInspector() throws HiveException {
+    TableDesc tableDesc = work.getTblDesc();
     try {
-      if (work.isNotPartitioned()) {
-        return getRowInspectorFromTable(work.getTblDesc());
+      tableSerDe = tableDesc.getDeserializer(job, true);
+      tableOI = (StructObjectInspector) tableSerDe.getObjectInspector();
+      if (!isPartitioned) {
+        return getTableRowOI(tableOI);
       }
+      partKeyOI = getPartitionKeyOI(tableDesc);
+
+      PartitionDesc partDesc = new PartitionDesc(tableDesc, null);
       List<PartitionDesc> listParts = work.getPartDesc();
       // Chose the table descriptor if none of the partitions is present.
       // For eg: consider the query:
@@ -729,39 +575,50 @@ public class FetchOperator implements Se
       // Both T1 and T2 and partitioned tables, but T1 does not have any partitions
       // FetchOperator is invoked for T1, and listParts is empty. In that case,
       // use T1's schema to get the ObjectInspector.
-      if (listParts == null || listParts.isEmpty()) {
-        return getRowInspectorFromPartitionedTable(work.getTblDesc());
+      if (listParts == null || listParts.isEmpty() || !needConversion(tableDesc, listParts)) {
+        return getPartitionedRowOI(tableOI);
       }
+      convertedOI = (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
+          tableOI, tableOI, null, false);
+      return getPartitionedRowOI(convertedOI);
+    } catch (Exception e) {
+      throw new HiveException("Failed with exception " + e.getMessage()
+          + StringUtils.stringifyException(e));
+    }
+  }
 
-      // Choose any partition. It's OI needs to be converted to the table OI
-      // Whenever a new partition is being read, a new converter is being created
-      PartitionDesc partition = listParts.get(0);
-      Deserializer tblSerde = partition.getTableDesc().getDeserializerClass().newInstance();
-      SerDeUtils.initializeSerDe(tblSerde, job, partition.getTableDesc().getProperties(), null);
-
-      partitionedTableOI = null;
-      ObjectInspector tableOI = tblSerde.getObjectInspector();
-
-      // Get the OI corresponding to all the partitions
-      for (PartitionDesc listPart : listParts) {
-        partition = listPart;
-        Deserializer partSerde = listPart.getDeserializer(job);
-        SerDeUtils.initializeSerDe(partSerde, job, partition.getTableDesc().getProperties(),
-                                   listPart.getProperties());
-
-        partitionedTableOI = ObjectInspectorConverters.getConvertedOI(
-            partSerde.getObjectInspector(), tableOI, oiSettableProperties);
-        if (!partitionedTableOI.equals(tableOI)) {
-          break;
+  private StructObjectInspector getTableRowOI(StructObjectInspector valueOI) {
+    return hasVC ? ObjectInspectorFactory.getUnionStructObjectInspector(
+        Arrays.asList(valueOI, vcsOI)) : valueOI;
+  }
+
+  private StructObjectInspector getPartitionedRowOI(StructObjectInspector valueOI) {
+    return ObjectInspectorFactory.getUnionStructObjectInspector(
+        hasVC ? Arrays.asList(valueOI, partKeyOI, vcsOI) : Arrays.asList(valueOI, partKeyOI));
+  }
+
+  private boolean needConversion(PartitionDesc partitionDesc) {
+    return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc));
+  }
+
+  // if table and all partitions have the same schema and serde, no need to convert
+  private boolean needConversion(TableDesc tableDesc, List<PartitionDesc> partDescs) {
+    Class<?> tableSerDe = tableDesc.getDeserializerClass();
+    String[] schemaProps = AnnotationUtils.getAnnotation(tableSerDe, SerDeSpec.class).schemaProps();
+    Properties tableProps = tableDesc.getProperties();
+    for (PartitionDesc partitionDesc : partDescs) {
+      if (!tableSerDe.getName().equals(partitionDesc.getDeserializerClassName())) {
+        return true;
+      }
+      Properties partProps = partitionDesc.getProperties();
+      for (String schemaProp : schemaProps) {
+        if (!org.apache.commons.lang3.StringUtils.equals(
+            tableProps.getProperty(schemaProp), partProps.getProperty(schemaProp))) {
+          return true;
         }
       }
-      return getRowInspectorFromPartition(partition, partitionedTableOI);
-    } catch (Exception e) {
-      throw new HiveException("Failed with exception " + e.getMessage()
-          + org.apache.hadoop.util.StringUtils.stringifyException(e));
-    } finally {
-      currPart = null;
     }
+    return false;
   }
 
   /**
@@ -797,11 +654,17 @@ public class FetchOperator implements Se
     // shrinked size for this split. counter part of this in normal mode is
     // InputSplitShim.shrinkedLength.
     // what's different is that this is evaluated by unit of row using RecordReader.getPos()
-    // and that is evaluated by unit of split using InputSplt.getLength().
+    // and that is evaluated by unit of split using InputSplit.getLength().
     private long shrinkedLength = -1;
+    private InputFormat inputFormat;
+
+    public FetchInputFormatSplit(InputSplit split, InputFormat inputFormat) {
+      super(split, inputFormat.getClass().getName());
+      this.inputFormat = inputFormat;
+    }
 
-    public FetchInputFormatSplit(InputSplit split, String name) {
-      super(split, name);
+    public RecordReader<WritableComparable, Writable> getRecordReader(JobConf job) throws IOException {
+      return inputFormat.getRecordReader(getInputSplit(), job, Reporter.NULL);
     }
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Mon Jan  5 19:03:49 2015
@@ -133,20 +133,20 @@ public class PartitionKeySampler impleme
   }
 
   // random sampling
-  public static FetchSampler createSampler(FetchWork work, HiveConf conf, JobConf job,
-      Operator<?> operator) {
+  public static FetchOperator createSampler(FetchWork work, HiveConf conf, JobConf job,
+      Operator<?> operator) throws HiveException {
     int sampleNum = conf.getIntVar(HiveConf.ConfVars.HIVESAMPLINGNUMBERFORORDERBY);
     float samplePercent = conf.getFloatVar(HiveConf.ConfVars.HIVESAMPLINGPERCENTFORORDERBY);
     if (samplePercent < 0.0 || samplePercent > 1.0) {
       throw new IllegalArgumentException("Percentile value must be within the range of 0 to 1.");
     }
-    FetchSampler sampler = new FetchSampler(work, job, operator);
+    RandomSampler sampler = new RandomSampler(work, job, operator);
     sampler.setSampleNum(sampleNum);
     sampler.setSamplePercent(samplePercent);
     return sampler;
   }
 
-  private static class FetchSampler extends FetchOperator {
+  private static class RandomSampler extends FetchOperator {
 
     private int sampleNum = 1000;
     private float samplePercent = 0.1f;
@@ -154,7 +154,8 @@ public class PartitionKeySampler impleme
 
     private int sampled;
 
-    public FetchSampler(FetchWork work, JobConf job, Operator<?> operator) {
+    public RandomSampler(FetchWork work, JobConf job, Operator<?> operator)
+        throws HiveException {
       super(work, job, operator, null);
     }
 
@@ -174,7 +175,7 @@ public class PartitionKeySampler impleme
       if (sampled < sampleNum) {
         return true;
       }
-      operator.flush();
+      flushRow();
       return false;
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Jan  5 19:03:49 2015
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.mr;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
 import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.List;
@@ -30,8 +28,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
 import org.apache.hadoop.hive.ql.exec.ObjectCache;
@@ -54,7 +50,7 @@ import org.apache.hadoop.util.StringUtil
 /**
  * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
  * the bridge between the map-reduce framework and the Hive operator pipeline at
- * execution time. It's main responsabilities are:
+ * execution time. It's main responsibilities are:
  *
  * - Load and setup the operator pipeline from XML
  * - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
@@ -66,7 +62,6 @@ public class ExecMapper extends MapReduc
 
   private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
-  private Map<String, FetchOperator> fetchOperators;
   private OutputCollector oc;
   private JobConf jc;
   private boolean abort = false;
@@ -74,7 +69,6 @@ public class ExecMapper extends MapReduc
   public static final Log l4j = LogFactory.getLog(ExecMapper.class);
   private static boolean done;
 
-  // used to log memory usage periodically
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
@@ -213,15 +207,6 @@ public class ExecMapper extends MapReduc
         }
       }
 
-      if (fetchOperators != null) {
-        MapredLocalWork localWork = mo.getConf().getMapRedLocalWork();
-        for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-          Operator<? extends OperatorDesc> forwardOp = localWork
-              .getAliasToWork().get(entry.getKey());
-          forwardOp.close(abort);
-        }
-      }
-
       ReportStats rps = new ReportStats(rp, jc);
       mo.preorderMap(rps);
       return;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Jan  5 19:03:49 2015
@@ -6263,7 +6263,7 @@ public class SemanticAnalyzer extends Ba
     } else {
       try {
         StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc
-            .getDeserializer().getObjectInspector();
+            .getDeserializer(conf).getObjectInspector();
         List<? extends StructField> fields = rowObjectInspector
             .getAllStructFieldRefs();
         for (int i = 0; i < fields.size(); i++) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java Mon Jan  5 19:03:49 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.TreeMap;
 
@@ -28,7 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Li
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 /**
  * FetchWork.
@@ -53,7 +54,7 @@ public class FetchWork implements Serial
   private SplitSample splitSample;
 
   private transient List<List<Object>> rowsComputedFromStats;
-  private transient ObjectInspector statRowOI;
+  private transient StructObjectInspector statRowOI;
 
   /**
    * Serialization Null Format for the serde used to fetch data.
@@ -63,12 +64,12 @@ public class FetchWork implements Serial
   public FetchWork() {
   }
 
-  public FetchWork(List<List<Object>> rowsComputedFromStats,ObjectInspector statRowOI) {
+  public FetchWork(List<List<Object>> rowsComputedFromStats, StructObjectInspector statRowOI) {
     this.rowsComputedFromStats = rowsComputedFromStats;
     this.statRowOI = statRowOI;
   }
 
-  public ObjectInspector getStatRowOI() {
+  public StructObjectInspector getStatRowOI() {
     return statRowOI;
   }
 
@@ -174,6 +175,11 @@ public class FetchWork implements Serial
     return partDesc;
   }
 
+  public List<Path> getPathLists() {
+    return isPartitioned() ? partDir == null ?
+        null : new ArrayList<Path>(partDir) : Arrays.asList(tblDir);
+  }
+
   /**
    * Get Partition descriptors in sorted (ascending) order of partition directory
    *

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Mon Jan  5 19:03:49 2015
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.Des
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 
 /**
@@ -84,8 +85,17 @@ public class TableDesc implements Serial
    * Return a deserializer object corresponding to the tableDesc.
    */
   public Deserializer getDeserializer(Configuration conf) throws Exception {
-    Deserializer de = getDeserializerClass().newInstance();
-    SerDeUtils.initializeSerDe(de, conf, properties, null);
+    return getDeserializer(conf, false);
+  }
+
+  public Deserializer getDeserializer(Configuration conf, boolean ignoreError) throws Exception {
+    Deserializer de = ReflectionUtils.newInstance(
+        getDeserializerClass().asSubclass(Deserializer.class), conf);
+    if (ignoreError) {
+      SerDeUtils.initializeSerDeWithoutErrorCheck(de, conf, properties, null);
+    } else {
+      SerDeUtils.initializeSerDe(de, conf, properties, null);
+    }
     return de;
   }
 

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorConverters.java Mon Jan  5 19:03:49 2015
@@ -209,7 +209,7 @@ public final class ObjectInspectorConver
    *           can contain non-settable fields only if inputOI equals outputOI and equalsCheck is
    *           true.
    */
-  private static ObjectInspector getConvertedOI(
+  public static ObjectInspector getConvertedOI(
       ObjectInspector inputOI,
       ObjectInspector outputOI,
       Map<ObjectInspector, Boolean> oiSettableProperties,

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java?rev=1649612&r1=1649611&r2=1649612&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/PrimitiveObjectInspectorConverter.java Mon Jan  5 19:03:49 2015
@@ -478,6 +478,9 @@ public class PrimitiveObjectInspectorCon
 
     @Override
     public Object convert(Object input) {
+      if (input == null) {
+        return null;
+      }
       switch (inputOI.getPrimitiveCategory()) {
         case BOOLEAN:
           return outputOI.set(hc,
@@ -504,6 +507,9 @@ public class PrimitiveObjectInspectorCon
 
     @Override
     public Object convert(Object input) {
+      if (input == null) {
+        return null;
+      }
       switch (inputOI.getPrimitiveCategory()) {
       case BOOLEAN:
         return outputOI.set(hc,