You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC

svn commit: r1623263 [16/28] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/apache/hive/beeline/ bin/...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Sep  8 04:38:17 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,12 +30,15 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -62,6 +66,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -100,6 +105,10 @@ public class FileSinkOperator extends Te
   private transient List<Object> keyWritables;
   private transient List<String> keys;
   private transient int numKeyColToRead;
+  private StructField recIdField; // field to find record identifier in
+  private StructField bucketField; // field bucket is in in record id
+  private StructObjectInspector recIdInspector; // OI for inspecting record id
+  private IntObjectInspector bucketInspector; // OI for inspecting bucket id
 
   /**
    * RecordWriter.
@@ -117,7 +126,10 @@ public class FileSinkOperator extends Te
     Path[] outPaths;
     Path[] finalPaths;
     RecordWriter[] outWriters;
+    RecordUpdater[] updaters;
     Stat stat;
+    int acidLastBucket = -1;
+    int acidFileOffset = -1;
 
     public FSPaths() {
     }
@@ -128,6 +140,8 @@ public class FileSinkOperator extends Te
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
+      updaters = new RecordUpdater[numFiles];
+      LOG.debug("Created slots for  " + numFiles);
       stat = new Stat();
     }
 
@@ -168,6 +182,15 @@ public class FileSinkOperator extends Te
           }
         }
       }
+      try {
+        for (int i = 0; i < updaters.length; i++) {
+          if (updaters[i] != null) {
+            updaters[i].close(abort);
+          }
+        }
+      } catch (IOException e) {
+        throw new HiveException(e);
+      }
     }
 
     private void commit(FileSystem fs) throws HiveException {
@@ -177,7 +200,21 @@ public class FileSinkOperator extends Te
               && !fs.exists(finalPaths[idx].getParent())) {
             fs.mkdirs(finalPaths[idx].getParent());
           }
-          if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+          boolean needToRename = true;
+          if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+              conf.getWriteType() == AcidUtils.Operation.DELETE) {
+            // If we're updating or deleting there may be no file to close.  This can happen
+            // because the where clause strained out all of the records for a given bucket.  So
+            // before attempting the rename below, check if our file exists.  If it doesn't,
+            // then skip the rename.  If it does try it.  We could just blindly try the rename
+            // and avoid the extra stat, but that would mask other errors.
+            try {
+              FileStatus stat = fs.getFileStatus(outPaths[idx]);
+            } catch (FileNotFoundException fnfe) {
+              needToRename = false;
+            }
+          }
+          if (needToRename && !fs.rename(outPaths[idx], finalPaths[idx])) {
             throw new HiveException("Unable to rename output from: " +
                 outPaths[idx] + " to: " + finalPaths[idx]);
           }
@@ -350,6 +387,16 @@ public class FileSinkOperator extends Te
           valToPaths.put("", fsp); // special entry for non-DP case
         }
       }
+
+      if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+          conf.getWriteType() == AcidUtils.Operation.DELETE) {
+        // ROW__ID is always in the first field
+        recIdField = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs().get(0);
+        recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+        // bucket is the second field in the record id
+        bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+        bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+      }
       initializeChildren(hconf);
     } catch (HiveException e) {
       throw e;
@@ -420,6 +467,7 @@ public class FileSinkOperator extends Te
           assert totalFiles == 1;
         }
 
+        int bucketNum = 0;
         if (multiFileSpray) {
           key.setHashCode(idx);
 
@@ -436,7 +484,7 @@ public class FileSinkOperator extends Te
             }
           }
 
-          int bucketNum = prtner.getBucket(key, null, totalFiles);
+          bucketNum = prtner.getBucket(key, null, totalFiles);
           if (seenBuckets.contains(bucketNum)) {
             continue;
           }
@@ -462,7 +510,8 @@ public class FileSinkOperator extends Te
     filesCreated = true;
   }
 
-  protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException {
+  protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
+      throws HiveException {
     try {
       if (isNativeTable) {
         fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
@@ -493,11 +542,21 @@ public class FileSinkOperator extends Te
       Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
       // only create bucket files only if no dynamic partitions,
       // buckets of dynamic partitions will be created for each newly created partition
-      fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
-          outputClass, conf, fsp.outPaths[filesIdx], reporter);
-      // If the record writer provides stats, get it from there instead of the serde
-      statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
-      // increment the CREATED_FILES counter
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+        fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
+            outputClass, conf, fsp.outPaths[filesIdx], reporter);
+        // If the record writer provides stats, get it from there instead of the serde
+        statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
+            StatsProvidingRecordWriter;
+        // increment the CREATED_FILES counter
+      } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+        // Only set up the updater for insert.  For update and delete we don't know unitl we see
+        // the row.
+        ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
+        int acidBucketNum = Integer.valueOf(Utilities.getTaskIdFromFilename(taskId));
+        fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
+            acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1);
+      }
       if (reporter != null) {
         reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
             Operator.HIVECOUNTERCREATEDFILES, 1);
@@ -598,27 +657,47 @@ public class FileSinkOperator extends Te
       }
 
 
-      RecordWriter rowOutWriter = null;
-
       if (row_count != null) {
         row_count.set(row_count.get() + 1);
       }
 
-      if (!multiFileSpray) {
-        rowOutWriter = rowOutWriters[0];
+      int writerOffset = findWriterOffset(row);
+      // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
+      // for a given operator branch prediction should work quite nicely on it.
+      // RecordUpdateer expects to get the actual row, not a serialized version of it.  Thus we
+      // pass the row rather than recordValue.
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+        rowOutWriters[writerOffset].write(recordValue);
+      } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+        fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
       } else {
-        int keyHashCode = 0;
-        for (int i = 0; i < partitionEval.length; i++) {
-          Object o = partitionEval[i].evaluate(row);
-          keyHashCode = keyHashCode * 31
-              + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
-        }
-        key.setHashCode(keyHashCode);
-        int bucketNum = prtner.getBucket(key, null, totalFiles);
-        int idx = bucketMap.get(bucketNum);
-        rowOutWriter = rowOutWriters[idx];
+        // TODO I suspect we could skip much of the stuff above this in the function in the case
+        // of update and delete.  But I don't understand all of the side effects of the above
+        // code and don't want to skip over it yet.
+
+        // Find the bucket id, and switch buckets if need to
+        ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector;
+        Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
+        int bucketNum =
+            bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
+        if (fpaths.acidLastBucket != bucketNum) {
+          fpaths.acidLastBucket = bucketNum;
+          // Switch files
+          fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
+              jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset],
+              rowInspector, reporter, 0);
+          LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
+              fpaths.outPaths[fpaths.acidFileOffset]);
+        }
+
+        if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+          fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+        } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+          fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+        } else {
+          throw new HiveException("Unknown write type " + conf.getWriteType().toString());
+        }
       }
-      rowOutWriter.write(recordValue);
     } catch (IOException e) {
       throw new HiveException(e);
     } catch (SerDeException e) {
@@ -627,6 +706,11 @@ public class FileSinkOperator extends Te
   }
 
   protected boolean areAllTrue(boolean[] statsFromRW) {
+    // If we are doing an acid operation they will always all be true as RecordUpdaters always
+    // collect stats
+    if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID) {
+      return true;
+    }
     for(boolean b : statsFromRW) {
       if (!b) {
         return false;
@@ -635,6 +719,23 @@ public class FileSinkOperator extends Te
     return true;
   }
 
+  private int findWriterOffset(Object row) throws HiveException {
+    if (!multiFileSpray) {
+      return 0;
+    } else {
+      int keyHashCode = 0;
+      for (int i = 0; i < partitionEval.length; i++) {
+        Object o = partitionEval[i].evaluate(row);
+        keyHashCode = keyHashCode * 31
+            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      }
+      key.setHashCode(keyHashCode);
+      int bucketNum = prtner.getBucket(key, null, totalFiles);
+      return bucketMap.get(bucketNum);
+    }
+
+  }
+
   /**
    * Lookup list bucketing path.
    * @param lbDirName
@@ -727,14 +828,16 @@ public class FileSinkOperator extends Te
     FSPaths fp;
 
     // get the path corresponding to the dynamic partition columns,
-    String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
+    String dpDir = getDynPartDirectory(row, dpColNames);
 
     String pathKey = null;
     if (dpDir != null) {
       dpDir = appendToSource(lbDirName, dpDir);
       pathKey = dpDir;
+      int numericBucketNum = 0;
       if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
         String buckNum = row.get(row.size() - 1);
+        numericBucketNum = Integer.valueOf(buckNum);
         taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum);
         pathKey = appendToSource(taskId, dpDir);
       }
@@ -756,13 +859,18 @@ public class FileSinkOperator extends Te
           // since we are closing the previous fsp's record writers, we need to see if we can get
           // stats from the record writer and store in the previous fsp that is cached
           if (conf.isGatherStats() && isCollectRWStats) {
-            RecordWriter outWriter = prevFsp.outWriters[0];
-            if (outWriter != null) {
-              SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
-              if (stats != null) {
+            SerDeStats stats = null;
+            if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+              RecordWriter outWriter = prevFsp.outWriters[0];
+              if (outWriter != null) {
+                stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+              }
+            } else if (prevFsp.updaters[0] != null) {
+              stats = prevFsp.updaters[0].getStats();
+            }
+            if (stats != null) {
                 prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
                 prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
-              }
             }
           }
 
@@ -805,8 +913,7 @@ public class FileSinkOperator extends Te
   // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
   // return the relative path corresponding to the row.
   // e.g., ds=2008-04-08/hr=11
-  private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
-    assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns";
+  private String getDynPartDirectory(List<String> row, List<String> dpColNames) {
     return FileUtils.makePartName(dpColNames, row);
   }
 
@@ -832,6 +939,7 @@ public class FileSinkOperator extends Te
 
   @Override
   public void closeOp(boolean abort) throws HiveException {
+
     if (!bDynParts && !filesCreated) {
       createBucketFiles(fsp);
     }
@@ -849,13 +957,25 @@ public class FileSinkOperator extends Te
         // record writer already gathers the statistics, it can simply return the
         // accumulated statistics which will be aggregated in case of spray writers
         if (conf.isGatherStats() && isCollectRWStats) {
-          for (int idx = 0; idx < fsp.outWriters.length; idx++) {
-            RecordWriter outWriter = fsp.outWriters[idx];
-            if (outWriter != null) {
-              SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
-              if (stats != null) {
-                fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
-                fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+          if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+            for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+              RecordWriter outWriter = fsp.outWriters[idx];
+              if (outWriter != null) {
+                SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+                if (stats != null) {
+                  fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+                  fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+                }
+              }
+            }
+          } else {
+            for (int i = 0; i < fsp.updaters.length; i++) {
+              if (fsp.updaters[i] != null) {
+                SerDeStats stats = fsp.updaters[i].getStats();
+                if (stats != null) {
+                  fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+                  fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+                }
               }
             }
           }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Mon Sep  8 04:38:17 2014
@@ -39,7 +39,6 @@ import javax.xml.parsers.DocumentBuilder
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -562,7 +561,7 @@ public final class FunctionRegistry {
           return null;
         }
 
-        Class<?> udfClass = Class.forName(func.getClassName(), true, JavaUtils.getClassLoader());
+        Class<?> udfClass = Class.forName(func.getClassName(), true, Utilities.getSessionSpecifiedClassLoader());
         if (registerTemporaryFunction(functionName, udfClass)) {
           ret = mFunctions.get(functionName);
         } else {
@@ -610,7 +609,7 @@ public final class FunctionRegistry {
     // Even if we have a reference to the class (which will be the case for GenericUDFs),
     // the classloader may not be able to resolve the class, which would mean reflection-based
     // methods would fail such as for plan deserialization. Make sure this works too.
-    Class.forName(udfClass.getName(), true, JavaUtils.getClassLoader());
+    Class.forName(udfClass.getName(), true, Utilities.getSessionSpecifiedClassLoader());
   }
 
   private static void loadFunctionResourcesIfNecessary(String functionName, CommonFunctionInfo cfi) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java Mon Sep  8 04:38:17 2014
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec;
 import static org.apache.hadoop.util.StringUtils.stringifyException;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -33,10 +32,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FunctionUtils.FunctionType;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils.UDFClassType;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -47,10 +44,6 @@ import org.apache.hadoop.hive.ql.plan.Dr
 import org.apache.hadoop.hive.ql.plan.FunctionWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -308,9 +301,10 @@ public class FunctionTask extends Task<F
     }
   }
 
-  @SuppressWarnings("unchecked")
   private Class<?> getUdfClass(CreateFunctionDesc desc) throws ClassNotFoundException {
-    return Class.forName(desc.getClassName(), true, JavaUtils.getClassLoader());
+    // get the session specified class loader from SessionState
+    ClassLoader classLoader = Utilities.getSessionSpecifiedClassLoader();
+    return Class.forName(desc.getClassName(), true, classLoader);
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java Mon Sep  8 04:38:17 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Class to handle heartbeats for MR and Tez tasks.
@@ -64,7 +65,8 @@ public class Heartbeater {
     if (heartbeatInterval == 0) {
       // Multiply the heartbeat interval by 1000 to convert to milliseconds,
       // but divide by 2 to give us a safety factor.
-      heartbeatInterval = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 500;
+      heartbeatInterval = HiveConf.getTimeVar(
+          conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2;
       if (heartbeatInterval == 0) {
         LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent");
         dontHeartbeat = true;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveTotalOrderPartitioner.java Mon Sep  8 04:38:17 2014
@@ -20,24 +20,50 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
 
-public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object> {
+public class HiveTotalOrderPartitioner implements Partitioner<HiveKey, Object>, Configurable {
 
-  private Partitioner<BytesWritable, Object> partitioner
-      = new TotalOrderPartitioner<BytesWritable, Object>();
+  private static final Log LOG = LogFactory.getLog(HiveTotalOrderPartitioner.class);
 
+  private Partitioner<BytesWritable, Object> partitioner;
+
+  @Override
   public void configure(JobConf job) {
-    JobConf newconf = new JobConf(job);
-    newconf.setMapOutputKeyClass(BytesWritable.class);
-    partitioner.configure(newconf);
+    if (partitioner == null) {
+      configurePartitioner(new JobConf(job));
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    // walk-around of TEZ-1403
+    if (partitioner == null) {
+      configurePartitioner(new JobConf(conf));
+    }
   }
 
   public int getPartition(HiveKey key, Object value, int numPartitions) {
     return partitioner.getPartition(key, value, numPartitions);
   }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  private void configurePartitioner(JobConf conf) {
+    LOG.info(TotalOrderPartitioner.getPartitionFile(conf));
+    conf.setMapOutputKeyClass(BytesWritable.class);
+    partitioner = new TotalOrderPartitioner<BytesWritable, Object>();
+    partitioner.configure(conf);
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java Mon Sep  8 04:38:17 2014
@@ -57,7 +57,7 @@ public class ListSinkOperator extends Op
     FetchFormatter fetcher;
     if (formatterName != null && !formatterName.isEmpty()) {
       Class<? extends FetchFormatter> fetcherClass = Class.forName(formatterName, true,
-          JavaUtils.getClassLoader()).asSubclass(FetchFormatter.class);
+          Utilities.getSessionSpecifiedClassLoader()).asSubclass(FetchFormatter.class);
       fetcher = ReflectionUtils.newInstance(fetcherClass, null);
     } else {
       fetcher = new DefaultFetchFormatter();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon Sep  8 04:38:17 2014
@@ -104,6 +104,8 @@ public class MapJoinOperator extends Abs
     cache = ObjectCacheFactory.getCache(hconf);
     loader = HashTableLoaderFactory.getLoader(hconf);
 
+    hashMapRowGetters = null;
+
     mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey);
     mapJoinTableSerdes = (MapJoinTableContainerSerDe[]) cache.retrieve(serdeKey);
     hashTblInitedOnce = true;
@@ -186,7 +188,7 @@ public class MapJoinOperator extends Abs
        * process different buckets and if the container is reused to join a different bucket,
        * join results can be incorrect. The cache is keyed on operator id and for bucket map join
        * the operator does not change but data needed is different. For a proper fix, this
-       * requires changes in the Tez API with regard to finding bucket id and 
+       * requires changes in the Tez API with regard to finding bucket id and
        * also ability to schedule tasks to re-use containers that have cached the specific bucket.
        */
       LOG.info("This is not bucket map join, so cache");
@@ -237,7 +239,7 @@ public class MapJoinOperator extends Abs
         firstRow = false;
       }
 
-      alias = (byte)tag;
+      alias = (byte) tag;
       if (hashMapRowGetters == null) {
         hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
         MapJoinKey refKey = getRefKey(alias);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Sep  8 04:38:17 2014
@@ -348,17 +348,17 @@ public class MoveTask extends Task<MoveW
             // want to isolate any potential issue it may introduce.
             ArrayList<LinkedHashMap<String, String>> dp =
               db.loadDynamicPartitions(
-                  tbd.getSourcePath(),
-                  tbd.getTable().getTableName(),
-                	tbd.getPartitionSpec(),
-                	tbd.getReplace(),
-                	dpCtx.getNumDPCols(),
-                	tbd.getHoldDDLTime(),
-                	isSkewedStoredAsDirs(tbd));
+                tbd.getSourcePath(),
+                tbd.getTable().getTableName(),
+                tbd.getPartitionSpec(),
+                tbd.getReplace(),
+                dpCtx.getNumDPCols(),
+                tbd.getHoldDDLTime(),
+                isSkewedStoredAsDirs(tbd));
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
-              		" To turn off this error, set hive.error.on.empty.partition=false.");
+                  " To turn off this error, set hive.error.on.empty.partition=false.");
             }
 
             // for each partition spec, get the partition
@@ -412,13 +412,13 @@ public class MoveTask extends Task<MoveW
                   numBuckets, sortCols);
             }
 
-          	dc = new DataContainer(table.getTTable(), partn.getTPartition());
-          	// add this partition to post-execution hook
-          	if (work.getOutputs() != null) {
-          	  work.getOutputs().add(new WriteEntity(partn,
+            dc = new DataContainer(table.getTTable(), partn.getTPartition());
+            // add this partition to post-execution hook
+            if (work.getOutputs() != null) {
+              work.getOutputs().add(new WriteEntity(partn,
                   (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
                       : WriteEntity.WriteType.INSERT)));
-          	}
+            }
          }
         }
         if (SessionState.get() != null && dc != null) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Sep  8 04:38:17 2014
@@ -29,13 +29,15 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
@@ -64,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Un
  * OperatorFactory.
  *
  */
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public final class OperatorFactory {
   private static final List<OpTuple> opvec;
   private static final List<OpTuple> vectorOpvec;
@@ -101,6 +104,10 @@ public final class OperatorFactory {
         DemuxOperator.class));
     opvec.add(new OpTuple<MuxDesc>(MuxDesc.class,
         MuxOperator.class));
+    opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
+        AppMasterEventOperator.class));
+    opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
+        AppMasterEventOperator.class));
   }
 
   static {
@@ -119,9 +126,9 @@ public final class OperatorFactory {
 
   private static final class OpTuple<T extends OperatorDesc> {
     private final Class<T> descClass;
-    private final Class<? extends Operator<T>> opClass;
+    private final Class<? extends Operator<?>> opClass;
 
-    public OpTuple(Class<T> descClass, Class<? extends Operator<T>> opClass) {
+    public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) {
       this.descClass = descClass;
       this.opClass = opClass;
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Mon Sep  8 04:38:17 2014
@@ -45,61 +45,60 @@ import org.apache.hadoop.hive.serde2.obj
 
 public class PTFOperator extends Operator<PTFDesc> implements Serializable {
 
-	private static final long serialVersionUID = 1L;
-	boolean isMapOperator;
+  private static final long serialVersionUID = 1L;
+  boolean isMapOperator;
 
-	transient KeyWrapperFactory keyWrapperFactory;
-	protected transient KeyWrapper currentKeys;
-	protected transient KeyWrapper newKeys;
-	/*
-	 * for map-side invocation of PTFs, we cannot utilize the currentkeys null check
-	 * to decide on invoking startPartition in streaming mode. Hence this extra flag. 
-	 */
-	transient boolean firstMapRow;
-	transient Configuration hiveConf;
-	transient PTFInvocation ptfInvocation;
-
-	/*
-	 * 1. Find out if the operator is invoked at Map-Side or Reduce-side
-	 * 2. Get the deserialized QueryDef
-	 * 3. Reconstruct the transient variables in QueryDef
-	 * 4. Create input partition to store rows coming from previous operator
-	 */
-	@Override
-	protected void initializeOp(Configuration jobConf) throws HiveException {
-		hiveConf = jobConf;
-		// if the parent is ExtractOperator, this invocation is from reduce-side
-		isMapOperator = conf.isMapSide();
-
-		reconstructQueryDef(hiveConf);
-
-		if (isMapOperator) {
-			PartitionedTableFunctionDef tDef = conf.getStartOfChain();
-			outputObjInspector = tDef.getRawInputShape().getOI();
-		} else {
-			outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
-		}
-
-		setupKeysWrapper(inputObjInspectors[0]);
-		
-		ptfInvocation = setupChain();
-		ptfInvocation.initializeStreaming(jobConf, isMapOperator);
-		firstMapRow = true;
-
-		super.initializeOp(jobConf);
-	}
-
-	@Override
-	protected void closeOp(boolean abort) throws HiveException {
-		super.closeOp(abort);
+  transient KeyWrapperFactory keyWrapperFactory;
+  protected transient KeyWrapper currentKeys;
+  protected transient KeyWrapper newKeys;
+  /*
+   * for map-side invocation of PTFs, we cannot utilize the currentkeys null check
+   * to decide on invoking startPartition in streaming mode. Hence this extra flag.
+   */
+  transient boolean firstMapRow;
+  transient Configuration hiveConf;
+  transient PTFInvocation ptfInvocation;
+
+  /*
+   * 1. Find out if the operator is invoked at Map-Side or Reduce-side
+   * 2. Get the deserialized QueryDef
+   * 3. Reconstruct the transient variables in QueryDef
+   * 4. Create input partition to store rows coming from previous operator
+   */
+  @Override
+  protected void initializeOp(Configuration jobConf) throws HiveException {
+    hiveConf = jobConf;
+    // if the parent is ExtractOperator, this invocation is from reduce-side
+    isMapOperator = conf.isMapSide();
+
+    reconstructQueryDef(hiveConf);
+
+    if (isMapOperator) {
+      PartitionedTableFunctionDef tDef = conf.getStartOfChain();
+      outputObjInspector = tDef.getRawInputShape().getOI();
+    } else {
+      outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
+    }
+
+    setupKeysWrapper(inputObjInspectors[0]);
+
+    ptfInvocation = setupChain();
+    ptfInvocation.initializeStreaming(jobConf, isMapOperator);
+    firstMapRow = true;
+
+    super.initializeOp(jobConf);
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    super.closeOp(abort);
     ptfInvocation.finishPartition();
     ptfInvocation.close();
   }
 
-	@Override
-	public void processOp(Object row, int tag) throws HiveException
-	{
-	  if (!isMapOperator ) {
+  @Override
+  public void processOp(Object row, int tag) throws HiveException {
+    if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:
        * - If not:
@@ -129,51 +128,51 @@ public class PTFOperator extends Operato
     }
 
     ptfInvocation.processRow(row);
-	}
+  }
+
+  /**
+   * Initialize the visitor to use the QueryDefDeserializer Use the order
+   * defined in QueryDefWalker to visit the QueryDef
+   *
+   * @param hiveConf
+   * @throws HiveException
+   */
+  protected void reconstructQueryDef(Configuration hiveConf) throws HiveException {
+
+    PTFDeserializer dS =
+        new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
+    dS.initializePTFChain(conf.getFuncDef());
+  }
+
+  protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
+    PartitionDef pDef = conf.getStartOfChain().getPartition();
+    List<PTFExpressionDef> exprs = pDef.getExpressions();
+    int numExprs = exprs.size();
+    ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
+    ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
+    ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
+
+    for(int i=0; i<numExprs; i++) {
+      PTFExpressionDef exprDef = exprs.get(i);
+      /*
+       * Why cannot we just use the ExprNodeEvaluator on the column?
+       * - because on the reduce-side it is initialized based on the rowOI of the HiveTable
+       *   and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side)
+       */
+      keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
+      keyOIs[i] = keyFields[i].initialize(inputOI);
+      currentKeyOIs[i] =
+          ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
+              ObjectInspectorCopyOption.WRITABLE);
+    }
+
+    keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs);
+    newKeys = keyWrapperFactory.getKeyWrapper();
+  }
 
-	/**
-	 * Initialize the visitor to use the QueryDefDeserializer Use the order
-	 * defined in QueryDefWalker to visit the QueryDef
-	 *
-	 * @param hiveConf
-	 * @throws HiveException
-	 */
-	protected void reconstructQueryDef(Configuration hiveConf) throws HiveException {
-
-	  PTFDeserializer dS =
-	      new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
-	  dS.initializePTFChain(conf.getFuncDef());
-	}
-
-	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
-		PartitionDef pDef = conf.getStartOfChain().getPartition();
-		List<PTFExpressionDef> exprs = pDef.getExpressions();
-		int numExprs = exprs.size();
-		ExprNodeEvaluator[] keyFields = new ExprNodeEvaluator[numExprs];
-		ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
-		ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
-
-		for(int i=0; i<numExprs; i++) {
-		  PTFExpressionDef exprDef = exprs.get(i);
-			/*
-			 * Why cannot we just use the ExprNodeEvaluator on the column?
-			 * - because on the reduce-side it is initialized based on the rowOI of the HiveTable
-			 *   and not the OI of the ExtractOp ( the parent of this Operator on the reduce-side)
-			 */
-			keyFields[i] = ExprNodeEvaluatorFactory.get(exprDef.getExprNode());
-			keyOIs[i] = keyFields[i].initialize(inputOI);
-			currentKeyOIs[i] =
-			    ObjectInspectorUtils.getStandardObjectInspector(keyOIs[i],
-			        ObjectInspectorCopyOption.WRITABLE);
-		}
-
-		keyWrapperFactory = new KeyWrapperFactory(keyFields, keyOIs, currentKeyOIs);
-	  newKeys = keyWrapperFactory.getKeyWrapper();
-	}
-
-	/**
-	 * @return the name of the operator
-	 */
+  /**
+   * @return the name of the operator
+   */
   @Override
   public String getName() {
     return getOperatorName();
@@ -184,11 +183,11 @@ public class PTFOperator extends Operato
   }
 
 
-	@Override
-	public OperatorType getType() {
-		return OperatorType.PTF;
-	}
-  
+  @Override
+  public OperatorType getType() {
+    return OperatorType.PTF;
+  }
+
   private PTFInvocation setupChain() {
     Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
     PTFInputDef iDef = conf.getFuncDef();
@@ -197,9 +196,9 @@ public class PTFOperator extends Operato
       fnDefs.push((PartitionedTableFunctionDef) iDef);
       iDef = ((PartitionedTableFunctionDef) iDef).getInput();
     }
-    
+
     PTFInvocation curr = null, first = null;
-    
+
     while(!fnDefs.isEmpty()) {
       PartitionedTableFunctionDef currFn = fnDefs.pop();
       curr = new PTFInvocation(curr, currFn.getTFunction());
@@ -222,26 +221,26 @@ public class PTFOperator extends Operato
       llFn.setpItr(pItr);
     }
   }
-  
+
   /*
    * Responsible for the flow of rows through the PTF Chain.
-   * An Invocation wraps a TableFunction. 
-   * The PTFOp hands the chain each row through the processRow call. 
+   * An Invocation wraps a TableFunction.
+   * The PTFOp hands the chain each row through the processRow call.
    * It also notifies the chain of when a Partition starts/finishes.
-   * 
+   *
    * There are several combinations depending
    * whether the TableFunction and its successor support Streaming or Batch mode.
-   * 
+   *
    * Combination 1: Streaming + Streaming
    * - Start Partition: invoke startPartition on tabFn.
-   * - Process Row: invoke process Row on tabFn. 
+   * - Process Row: invoke process Row on tabFn.
    *   Any output rows hand to next tabFn in chain or forward to next Operator.
    * - Finish Partition: invoke finishPartition on tabFn.
    *   Any output rows hand to next tabFn in chain or forward to next Operator.
-   *   
+   *
    * Combination 2: Streaming + Batch
    * same as Combination 1
-   * 
+   *
    * Combination 3: Batch + Batch
    * - Start Partition: create or reset the Input Partition for the tabFn
    *   caveat is: if prev is also batch and it is not providing an Output Iterator
@@ -251,22 +250,22 @@ public class PTFOperator extends Operato
    *   If function gives an Output Partition: set it on next Invocation's Input Partition
    *   If function gives an Output Iterator: iterate and call processRow on next Invocation.
    *   For last Invocation in chain: forward rows to next Operator.
-   *   
+   *
    * Combination 3: Batch + Stream
    * Similar to Combination 3, except Finish Partition behavior slightly different
    * - Finish Partition : invoke evaluate on tabFn on Input Partition
    *   iterate output rows: hand to next tabFn in chain or forward to next Operator.
-   * 
+   *
    */
   class PTFInvocation {
-    
+
     PTFInvocation prev;
     PTFInvocation next;
     TableFunctionEvaluator tabFn;
     PTFPartition inputPart;
     PTFPartition outputPart;
     Iterator<Object> outputPartRowsItr;
-    
+
     public PTFInvocation(PTFInvocation prev, TableFunctionEvaluator tabFn) {
       this.prev = prev;
       this.tabFn = tabFn;
@@ -274,19 +273,19 @@ public class PTFOperator extends Operato
         prev.next = this;
       }
     }
-    
+
     boolean isOutputIterator() {
       return tabFn.canAcceptInputAsStream() || tabFn.canIterateOutput();
     }
-    
+
     boolean isStreaming() {
       return tabFn.canAcceptInputAsStream();
     }
-    
+
     void initializeStreaming(Configuration cfg, boolean isMapSide) throws HiveException {
       PartitionedTableFunctionDef tabDef = tabFn.getTableDef();
       PTFInputDef inputDef = tabDef.getInput();
-      ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? 
+      ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
           inputObjInspectors[0] : inputDef.getOutputShape().getOI();
 
       tabFn.initializeStreaming(cfg, (StructObjectInspector) inputOI, isMapSide);
@@ -295,7 +294,7 @@ public class PTFOperator extends Operato
         next.initializeStreaming(cfg, isMapSide);
       }
     }
-    
+
     void startPartition() throws HiveException {
       if ( isStreaming() ) {
         tabFn.startPartition();
@@ -312,7 +311,7 @@ public class PTFOperator extends Operato
         next.startPartition();
       }
     }
-    
+
     void processRow(Object row) throws HiveException {
       if ( isStreaming() ) {
         handleOutputRows(tabFn.processRow(row));
@@ -320,7 +319,7 @@ public class PTFOperator extends Operato
         inputPart.append(row);
       }
     }
-    
+
     void handleOutputRows(List<Object> outRows) throws HiveException {
       if ( outRows != null ) {
         for (Object orow : outRows ) {
@@ -332,7 +331,7 @@ public class PTFOperator extends Operato
         }
       }
     }
-    
+
     void finishPartition() throws HiveException {
       if ( isStreaming() ) {
         handleOutputRows(tabFn.finishPartition());
@@ -353,7 +352,7 @@ public class PTFOperator extends Operato
           }
         }
       }
-      
+
       if ( next != null ) {
         next.finishPartition();
       } else {
@@ -364,7 +363,7 @@ public class PTFOperator extends Operato
         }
       }
     }
-    
+
     /**
      * Create a new Partition.
      * A partition has 2 OIs: the OI for the rows being put in and the OI for the rows
@@ -388,7 +387,7 @@ public class PTFOperator extends Operato
     private void createInputPartition() throws HiveException {
       PartitionedTableFunctionDef tabDef = tabFn.getTableDef();
       PTFInputDef inputDef = tabDef.getInput();
-      ObjectInspector inputOI = conf.getStartOfChain() == tabDef ? 
+      ObjectInspector inputOI = conf.getStartOfChain() == tabDef ?
           inputObjInspectors[0] : inputDef.getOutputShape().getOI();
 
       SerDe serde = conf.isMapSide() ? tabDef.getInput().getOutputShape().getSerde() :
@@ -400,7 +399,7 @@ public class PTFOperator extends Operato
           (StructObjectInspector) inputOI,
           outputOI);
     }
-    
+
     void close() {
       if ( inputPart != null ) {
         inputPart.close();
@@ -411,5 +410,5 @@ public class PTFOperator extends Operato
       }
     }
   }
-  
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Mon Sep  8 04:38:17 2014
@@ -27,6 +27,8 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +47,8 @@ import org.apache.hadoop.mapred.OutputCo
 
 public class PartitionKeySampler implements OutputCollector<HiveKey, Object> {
 
+  private static final Log LOG = LogFactory.getLog(PartitionKeySampler.class);
+
   public static final Comparator<byte[]> C = new Comparator<byte[]>() {
     public final int compare(byte[] o1, byte[] o2) {
       return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, o2.length);
@@ -74,32 +78,46 @@ public class PartitionKeySampler impleme
   }
 
   // sort and pick partition keys
-  // copied from org.apache.hadoop.mapred.lib.InputSampler
+  // originally copied from org.apache.hadoop.mapred.lib.InputSampler but seemed to have a bug
   private byte[][] getPartitionKeys(int numReduce) {
     if (sampled.size() < numReduce - 1) {
       throw new IllegalStateException("not enough number of sample");
     }
     byte[][] sorted = sampled.toArray(new byte[sampled.size()][]);
     Arrays.sort(sorted, C);
-    byte[][] partitionKeys = new byte[numReduce - 1][];
-    float stepSize = sorted.length / (float) numReduce;
-    int last = -1;
-    for(int i = 1; i < numReduce; ++i) {
-      int k = Math.round(stepSize * i);
-      while (last >= k && C.compare(sorted[last], sorted[k]) == 0) {
-        k++;
+
+    return toPartitionKeys(sorted, numReduce);
+  }
+
+  static final byte[][] toPartitionKeys(byte[][] sorted, int numPartition) {
+    byte[][] partitionKeys = new byte[numPartition - 1][];
+
+    int last = 0;
+    int current = 0;
+    for(int i = 0; i < numPartition - 1; i++) {
+      current += Math.round((float)(sorted.length - current) / (numPartition - i));
+      while (i > 0 && current < sorted.length && C.compare(sorted[last], sorted[current]) == 0) {
+        current++;
+      }
+      if (current >= sorted.length) {
+        return Arrays.copyOfRange(partitionKeys, 0, i);
       }
-      if (k >= sorted.length) {
-        throw new IllegalStateException("not enough number of sample");
+      if (LOG.isDebugEnabled()) {
+        // print out nth partition key for debugging
+        LOG.debug("Partition key " + current + "th :" + new BytesWritable(sorted[current]));
       }
-      partitionKeys[i - 1] = sorted[k];
-      last = k;
+      partitionKeys[i] = sorted[current];
+      last = current;
     }
     return partitionKeys;
   }
 
-  public void writePartitionKeys(Path path, JobConf job) throws IOException {
+  public void writePartitionKeys(Path path, HiveConf conf, JobConf job) throws IOException {
     byte[][] partitionKeys = getPartitionKeys(job.getNumReduceTasks());
+    int numPartition = partitionKeys.length + 1;
+    if (numPartition != job.getNumReduceTasks()) {
+      job.setNumReduceTasks(numPartition);
+    }
 
     FileSystem fs = path.getFileSystem(job);
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, path,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionTableFunctionDescription.java Mon Sep  8 04:38:17 2014
@@ -27,14 +27,13 @@ import java.lang.annotation.Target;
 import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
 
 @Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
+@Target(ElementType.TYPE)
 @Documented
-public @interface PartitionTableFunctionDescription
-{
-	Description description ();
+public @interface PartitionTableFunctionDescription {
+  Description description ();
 
-	/**
-	 * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function.
-	 */
-	boolean isInternal() default false;
+  /**
+   * if true it is not usable in the language. {@link WindowingTableFunction} is the only internal function.
+   */
+  boolean isInternal() default false;
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Sep  8 04:38:17 2014
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -158,12 +159,12 @@ public class ScriptOperator extends Oper
   }
 
   /**
-   * Maps a relative pathname to an absolute pathname using the PATH enviroment.
+   * Maps a relative pathname to an absolute pathname using the PATH environment.
    */
   public class PathFinder {
     String pathenv; // a string of pathnames
-    String pathSep; // the path seperator
-    String fileSep; // the file seperator in a directory
+    String pathSep; // the path separator
+    String fileSep; // the file separator in a directory
 
     /**
      * Construct a PathFinder object using the path from the specified system
@@ -286,7 +287,7 @@ public class ScriptOperator extends Oper
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    // initialize the user's process only when you recieve the first row
+    // initialize the user's process only when you receive the first row
     if (firstRow) {
       firstRow = false;
       try {
@@ -365,7 +366,8 @@ public class ScriptOperator extends Oper
             .getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) {
           autoProgressor = new AutoProgressor(this.getClass().getName(),
               reporter, Utilities.getDefaultNotificationInterval(hconf),
-              HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000);
+              HiveConf.getTimeVar(
+                  hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
           autoProgressor.go();
         }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Mon Sep  8 04:38:17 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.UD
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
@@ -86,7 +86,8 @@ public class UDTFOperator extends Operat
     if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) {
       autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,
           Utilities.getDefaultNotificationInterval(hconf),
-          HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT) * 1000);
+          HiveConf.getTimeVar(
+              hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
       autoProgressor.go();
     }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Sep  8 04:38:17 2014
@@ -92,7 +92,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -329,7 +328,9 @@ public final class Utilities {
       if (!gWorkMap.containsKey(path) ||
           HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
         Path localPath;
-        if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
+        if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
+          localPath = new Path(name);
+        } else if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
           localPath = path;
         } else {
           LOG.info("***************non-local mode***************");
@@ -827,10 +828,12 @@ public final class Utilities {
     }
   }
 
-  public static Set<Operator<?>> cloneOperatorTree(Configuration conf, Set<Operator<?>> roots) {
+  public static List<Operator<?>> cloneOperatorTree(Configuration conf, List<Operator<?>> roots) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
     serializePlan(roots, baos, conf, true);
-    Set<Operator<?>> result = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+    @SuppressWarnings("unchecked")
+    List<Operator<?>> result =
+        deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
         roots.getClass(), conf, true);
     return result;
   }
@@ -1371,8 +1374,8 @@ public final class Utilities {
       codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
     }
-    return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec,
-	progressable));
+    return SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec,
+      progressable);
 
   }
 
@@ -1980,6 +1983,26 @@ public final class Utilities {
   }
 
   /**
+   * get session specified class loader and get current class loader if fall
+   *
+   * @return
+   */
+  public static ClassLoader getSessionSpecifiedClassLoader() {
+    SessionState state = SessionState.get();
+    if (state == null || state.getConf() == null) {
+      LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+      return JavaUtils.getClassLoader();
+    }
+    ClassLoader sessionCL = state.getConf().getClassLoader();
+    if (sessionCL != null){
+      LOG.debug("Use session specified class loader");
+      return sessionCL;
+    }
+    LOG.debug("Session specified class loader not found, use thread based class loader");
+    return JavaUtils.getClassLoader();
+  }
+
+  /**
    * Create a URL from a string representing a path to a local file.
    * The path string can be just a path, or can start with file:/, file:///
    * @param onestr  path string
@@ -1999,6 +2022,33 @@ public final class Utilities {
     return oneurl;
   }
 
+    /**
+     * get the jar files from specified directory or get jar files by several jar names sperated by comma
+     * @param path
+     * @return
+     */
+    public static Set<String> getJarFilesByPath(String path){
+        Set<String> result = new HashSet<String>();
+        if (path == null || path.isEmpty()) {
+            return result;
+        }
+
+        File paths = new File(path);
+        if (paths.exists() && paths.isDirectory()) {
+            // add all jar files under the reloadable auxiliary jar paths
+            Set<File> jarFiles = new HashSet<File>();
+            jarFiles.addAll(org.apache.commons.io.FileUtils.listFiles(
+                    paths, new String[]{"jar"}, true));
+            for (File f : jarFiles) {
+                result.add(f.getAbsolutePath());
+            }
+        } else {
+            String[] files = path.split(",");
+            Collections.addAll(result, files);
+        }
+        return result;
+    }
+
   /**
    * Add new elements to the classpath.
    *
@@ -2771,7 +2821,7 @@ public final class Utilities {
    * first time it is caught, or SQLTransientException when the maxRetries has reached.
    */
   public static <T> T executeWithRetry(SQLCommand<T> cmd, PreparedStatement stmt,
-      int baseWindow, int maxRetries)  throws SQLException {
+      long baseWindow, int maxRetries)  throws SQLException {
 
     Random r = new Random();
     T result = null;
@@ -2813,7 +2863,7 @@ public final class Utilities {
    * first time it is caught, or SQLTransientException when the maxRetries has reached.
    */
   public static Connection connectWithRetry(String connectionString,
-      int waitWindow, int maxRetries) throws SQLException {
+      long waitWindow, int maxRetries) throws SQLException {
 
     Random r = new Random();
 
@@ -2855,7 +2905,7 @@ public final class Utilities {
    * first time it is caught, or SQLTransientException when the maxRetries has reached.
    */
   public static PreparedStatement prepareWithRetry(Connection conn, String stmt,
-      int waitWindow, int maxRetries) throws SQLException {
+      long waitWindow, int maxRetries) throws SQLException {
 
     Random r = new Random();
 
@@ -2895,7 +2945,7 @@ public final class Utilities {
    * @param r a random generator.
    * @return number of milliseconds for the next wait time.
    */
-  public static long getRandomWaitTime(int baseWindow, int failures, Random r) {
+  public static long getRandomWaitTime(long baseWindow, int failures, Random r) {
     return (long) (
           baseWindow * failures +     // grace period for the last round of attempt
           baseWindow * (failures + 1) * r.nextDouble()); // expanding time window for each failure
@@ -3381,7 +3431,6 @@ public final class Utilities {
   private static void createTmpDirs(Configuration conf,
       List<Operator<? extends OperatorDesc>> ops) throws IOException {
 
-    FsPermission fsPermission = new FsPermission((short)00777);
     while (!ops.isEmpty()) {
       Operator<? extends OperatorDesc> op = ops.remove(0);
 
@@ -3391,7 +3440,8 @@ public final class Utilities {
 
         if (tempDir != null) {
           Path tempPath = Utilities.toTempPath(tempDir);
-          createDirsWithPermission(conf, tempPath, fsPermission);
+          FileSystem fs = tempPath.getFileSystem(conf);
+          fs.mkdirs(tempPath);
         }
       }
 
@@ -3404,7 +3454,7 @@ public final class Utilities {
   /**
    * Returns true if a plan is both configured for vectorized execution
    * and vectorization is allowed. The plan may be configured for vectorization
-   * but vectorization dissalowed eg. for FetchOperator execution.
+   * but vectorization disallowed eg. for FetchOperator execution.
    */
   public static boolean isVectorMode(Configuration conf) {
     if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) &&
@@ -3527,77 +3577,6 @@ public final class Utilities {
   }
 
   /**
-   * @param conf the configuration used to derive the filesystem to create the path
-   * @param mkdir the path to be created
-   * @param fsPermission ignored if it is hive server session and doAs is enabled
-   * @return true if successfully created the directory else false
-   * @throws IOException if hdfs experiences any error conditions
-   */
-  public static boolean createDirsWithPermission(Configuration conf, Path mkdir,
-      FsPermission fsPermission) throws IOException {
-
-    boolean recursive = false;
-    if (SessionState.get() != null) {
-      recursive = SessionState.get().isHiveServerQuery() &&
-          conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,
-              HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);
-      // we reset the permission in case of hive server and doAs enabled because
-      // currently scratch directory uses /tmp/hive-hive as the scratch directory.
-      // However, with doAs enabled, the first user to create this directory would
-      // own the directory and subsequent users cannot access the scratch directory.
-      // The right fix is to have scratch dir per user.
-      fsPermission = new FsPermission((short)00777);
-    }
-
-    // if we made it so far without exception we are good!
-    return createDirsWithPermission(conf, mkdir, fsPermission, recursive);
-  }
-
-  private static void resetConfAndCloseFS (Configuration conf, boolean unsetUmask, 
-      String origUmask, FileSystem fs) throws IOException {
-    if (unsetUmask) {
-      if (origUmask != null) {
-        conf.set(FsPermission.UMASK_LABEL, origUmask);
-      } else {
-        // TODO HIVE-7831
-        // conf.unset(FsPermission.UMASK_LABEL);
-      }
-    }
-
-    fs.close();
-  }
-
-  public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath,
-      FsPermission fsPermission, boolean recursive) throws IOException {
-    String origUmask = null;
-    LOG.debug("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive " +
-        recursive);
-
-    if (recursive) {
-      origUmask = conf.get(FsPermission.UMASK_LABEL);
-      // this umask is required because by default the hdfs mask is 022 resulting in
-      // all parents getting the fsPermission & !(022) permission instead of fsPermission
-      conf.set(FsPermission.UMASK_LABEL, "000");
-    }
-
-    FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
-    boolean retval = false;
-    try {
-      retval = fs.mkdirs(mkdirPath, fsPermission);
-      resetConfAndCloseFS(conf, recursive, origUmask, fs);
-    } catch (IOException ioe) {
-      try {
-        resetConfAndCloseFS(conf, recursive, origUmask, fs);
-      }
-      catch (IOException e) {
-        // do nothing - double failure
-      }
-    }
-    return retval;
-  }
-
-
-  /**
    * Convert path to qualified path.
    *
    * @param conf

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Mon Sep  8 04:38:17 2014
@@ -28,39 +28,38 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction;
 
 @Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.TYPE})
+@Target(ElementType.TYPE)
 @Documented
-public @interface WindowFunctionDescription
-{
-	Description description ();
-	/**
-	 * controls whether this function can be applied to a Window.
-	 * <p>
-	 * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows.
-	 * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on.
-	 * For ranking defining a set of rows for every row makes no sense.
-	 * <p>
-	 * All other UDAFs can be computed for a Window.
-	 */
-	boolean supportsWindow() default true;
-	/**
-	 * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value.
-	 * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the
-	 * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value
-	 * for all the rows.
-	 */
-	boolean pivotResult() default false;
+public @interface WindowFunctionDescription {
+  Description description ();
+  /**
+   * controls whether this function can be applied to a Window.
+   * <p>
+   * Ranking function: Rank, Dense_Rank, Percent_Rank and Cume_Dist don't operate on Windows.
+   * Why? a window specification implies a row specific range i.e. every row gets its own set of rows to process the UDAF on.
+   * For ranking defining a set of rows for every row makes no sense.
+   * <p>
+   * All other UDAFs can be computed for a Window.
+   */
+  boolean supportsWindow() default true;
+  /**
+   * A WindowFunc is implemented as {@link GenericUDAFResolver2}. It returns only one value.
+   * If this is true then the function must return a List which is taken to be the column for this function in the Output table returned by the
+   * {@link WindowingTableFunction}. Otherwise the output is assumed to be a single value, the column of the Output will contain the same value
+   * for all the rows.
+   */
+  boolean pivotResult() default false;
 
-	/**
-	 * Used in translations process to validate arguments
-	 * @return true if ranking function
-	 */
-	boolean rankingFunction() default false;
+  /**
+   * Used in translations process to validate arguments
+   * @return true if ranking function
+   */
+  boolean rankingFunction() default false;
 
-	 /**
-	  * Using in analytical functions to specify that UDF implies an ordering
-	  * @return true if the function implies order
-	  */
-	 boolean impliesOrder() default false;
+   /**
+    * Using in analytical functions to specify that UDF implies an ordering
+    * @return true if the function implies order
+    */
+   boolean impliesOrder() default false;
 }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Mon Sep  8 04:38:17 2014
@@ -22,45 +22,39 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hive.common.util.AnnotationUtils;
 
 @SuppressWarnings("deprecation")
-public class WindowFunctionInfo implements CommonFunctionInfo
-{
-	boolean supportsWindow = true;
-	boolean pivotResult = false;
-	boolean impliesOrder = false;
-	FunctionInfo fInfo;
-
-	WindowFunctionInfo(FunctionInfo fInfo)
-	{
-		assert fInfo.isGenericUDAF();
-		this.fInfo = fInfo;
-		Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass();
-		WindowFunctionDescription def =
+public class WindowFunctionInfo implements CommonFunctionInfo {
+  boolean supportsWindow = true;
+  boolean pivotResult = false;
+  boolean impliesOrder = false;
+  FunctionInfo fInfo;
+
+  WindowFunctionInfo(FunctionInfo fInfo) {
+    assert fInfo.isGenericUDAF();
+    this.fInfo = fInfo;
+    Class<? extends GenericUDAFResolver> wfnCls = fInfo.getGenericUDAFResolver().getClass();
+    WindowFunctionDescription def =
           AnnotationUtils.getAnnotation(wfnCls, WindowFunctionDescription.class);
-		if ( def != null)
-		{
-			supportsWindow = def.supportsWindow();
-			pivotResult = def.pivotResult();
-			impliesOrder = def.impliesOrder();
-		}
-	}
-
-	public boolean isSupportsWindow()
-	{
-		return supportsWindow;
-	}
-
-	public boolean isPivotResult()
-	{
-		return pivotResult;
-	}
-
-	public boolean isImpliesOrder(){
-	  return impliesOrder;
-	}
-	public FunctionInfo getfInfo()
-	{
-		return fInfo;
-	}
+    if ( def != null) {
+      supportsWindow = def.supportsWindow();
+      pivotResult = def.pivotResult();
+      impliesOrder = def.impliesOrder();
+    }
+  }
+
+  public boolean isSupportsWindow() {
+    return supportsWindow;
+  }
+
+  public boolean isPivotResult() {
+    return pivotResult;
+  }
+
+  public boolean isImpliesOrder() {
+    return impliesOrder;
+  }
+  public FunctionInfo getfInfo() {
+    return fInfo;
+  }
 
   @Override
   public Class<?> getFunctionClass() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Mon Sep  8 04:38:17 2014
@@ -63,7 +63,7 @@ public class MapJoinMemoryExhaustionHand
     if(maxHeapSize == -1) {
       this.maxHeapSize = 200L * 1024L * 1024L;
       LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
-      		"defaulting maxHeapSize to 200MB");
+          "defaulting maxHeapSize to 200MB");
     } else {
       this.maxHeapSize = maxHeapSize;
     }
@@ -91,4 +91,4 @@ public class MapJoinMemoryExhaustionHand
       throw new MapJoinMemoryExhaustionException(msg);
     }
    }
-}
\ No newline at end of file
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Mon Sep  8 04:38:17 2014
@@ -371,7 +371,7 @@ public class ExecDriver extends Task<Map
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());
 
-      if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
+      if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) {
         try {
           handleSampling(driverContext, mWork, job, conf);
           job.setPartitionerClass(HiveTotalOrderPartitioner.class);
@@ -539,7 +539,7 @@ public class ExecDriver extends Task<Map
     } else {
       throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
     }
-    sampler.writePartitionKeys(partitionFile, job);
+    sampler.writePartitionKeys(partitionFile, conf, job);
   }
 
   /**

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Mon Sep  8 04:38:17 2014
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -219,8 +220,8 @@ public class HadoopJobExecHelper {
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
     //DecimalFormat longFormatter = new DecimalFormat("###,###");
     long reportTime = System.currentTimeMillis();
-    long maxReportInterval =
-        HiveConf.getLongVar(job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL);
+    long maxReportInterval = HiveConf.getTimeVar(
+        job, HiveConf.ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
     boolean fatal = false;
     StringBuilder errMsg = new StringBuilder();
     long pullInterval = HiveConf.getLongVar(job, HiveConf.ConfVars.HIVECOUNTERSPULLINTERVAL);
@@ -634,7 +635,7 @@ public class HadoopJobExecHelper {
     for (String clientStatsPublisherClass : clientStatsPublisherClasses) {
       try {
         clientStatsPublishers.add((ClientStatsPublisher) Class.forName(
-            clientStatsPublisherClass.trim(), true, JavaUtils.getClassLoader()).newInstance());
+            clientStatsPublisherClass.trim(), true, Utilities.getSessionSpecifiedClassLoader()).newInstance());
       } catch (Exception e) {
         LOG.warn(e.getClass().getName() + " occured when trying to create class: "
             + clientStatsPublisherClass.trim() + " implementing ClientStatsPublisher interface");

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Mon Sep  8 04:38:17 2014
@@ -130,7 +130,7 @@ public class MapRedTask extends ExecDriv
 
       runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
 
-      if(!runningViaChild) {
+      if (!runningViaChild) {
         // we are not running this mapred task via child jvm
         // so directly invoke ExecDriver
         return super.execute(driverContext);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Mon Sep  8 04:38:17 2014
@@ -14,10 +14,10 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.WriteBuffers;
 import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -57,7 +57,7 @@ public class MapJoinBytesTableContainer 
   private boolean[] sortableSortOrders;
   private KeyValueHelper writeHelper;
 
-  private List<Object> EMPTY_LIST = new ArrayList<Object>(0);
+  private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
   public MapJoinBytesTableContainer(Configuration hconf,
       MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
@@ -476,6 +476,7 @@ public class MapJoinBytesTableContainer 
       return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that?
     }
 
+    @Override
     public void addRow(List<Object> t) {
       if (dummyRow != null || !refs.isEmpty()) {
         throw new RuntimeException("Cannot add rows when not empty");
@@ -484,9 +485,11 @@ public class MapJoinBytesTableContainer 
     }
 
     // Various unsupported methods.
+    @Override
     public void addRow(Object[] value) {
       throw new RuntimeException(this.getClass().getCanonicalName() + " cannot add arrays");
     }
+    @Override
     public void write(MapJoinObjectSerDeContext valueContext, ObjectOutputStream out) {
       throw new RuntimeException(this.getClass().getCanonicalName() + " cannot be serialized");
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Mon Sep  8 04:38:17 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-
 import org.apache.hadoop.io.Writable;
 
 /**

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Mon Sep  8 04:38:17 2014
@@ -32,7 +32,7 @@ import org.apache.hadoop.io.Writable;
 
 @SuppressWarnings("deprecation")
 public class MapJoinTableContainerSerDe {
-  
+
   private final MapJoinObjectSerDeContext keyContext;
   private final MapJoinObjectSerDeContext valueContext;
   public MapJoinTableContainerSerDe(MapJoinObjectSerDeContext keyContext,
@@ -70,7 +70,7 @@ public class MapJoinTableContainerSerDe 
     }
     try {
       Writable keyContainer = keySerDe.getSerializedClass().newInstance();
-      Writable valueContainer = valueSerDe.getSerializedClass().newInstance();    
+      Writable valueContainer = valueSerDe.getSerializedClass().newInstance();
       int numKeys = in.readInt();
       for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) {
         MapJoinKeyObject key = new MapJoinKeyObject();
@@ -89,7 +89,7 @@ public class MapJoinTableContainerSerDe 
   public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer)
       throws HiveException {
     int numKeys = tableContainer.size();
-    try { 
+    try {
       out.writeUTF(tableContainer.getClass().getName());
       out.writeObject(tableContainer.getMetaData());
       out.writeInt(numKeys);
@@ -108,7 +108,7 @@ public class MapJoinTableContainerSerDe 
       throw new ConcurrentModificationException("TableContainer was modified while persisting: " + tableContainer);
     }
   }
-  
+
   public static void persistDummyTable(ObjectOutputStream out) throws IOException {
     MapJoinPersistableTableContainer tableContainer = new HashMapWrapper();
     out.writeUTF(tableContainer.getClass().getName());
@@ -127,8 +127,8 @@ public class MapJoinTableContainerSerDe 
       return constructor.newInstance(metaData);
     } catch (Exception e) {
       String msg = "Error while attemping to create table container" +
-      		" of type: " + name + ", with metaData: " + metaData;
+          " of type: " + name + ", with metaData: " + metaData;
       throw new HiveException(msg, e);
     }
   }
-}
\ No newline at end of file
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java Mon Sep  8 04:38:17 2014
@@ -270,7 +270,7 @@ public class PTFRowContainer<Row extends
 
       FileSystem fs = finalOutPath.getFileSystem(jc);
       final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc, fs, finalOutPath,
-	  BytesWritable.class, valueClass, isCompressed, progress);
+        BytesWritable.class, valueClass, isCompressed, progress);
 
       return new PTFRecordWriter(outStream);
     }