You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/22 06:05:10 UTC

svn commit: r1653769 [5/14] - in /hive/branches/spark: ./ beeline/src/java/org/apache/hive/beeline/ cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ data/scripts/ dev-s...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Jan 22 05:05:05 2015
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.reflect.Field;
@@ -34,15 +33,12 @@ import java.util.Set;
 
 import javolution.util.FastBitSet;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -72,115 +68,110 @@ import org.apache.hadoop.io.Text;
 /**
  * GroupBy operator implementation.
  */
-public class GroupByOperator extends Operator<GroupByDesc> implements
-    Serializable {
+public class GroupByOperator extends Operator<GroupByDesc> {
 
-  private static final Log LOG = LogFactory.getLog(GroupByOperator.class
-      .getName());
-  private static final boolean isTraceEnabled = LOG.isTraceEnabled();
   private static final long serialVersionUID = 1L;
   private static final int NUMROWSESTIMATESIZE = 1000;
 
-  protected transient ExprNodeEvaluator[] keyFields;
-  protected transient ObjectInspector[] keyObjectInspectors;
+  private transient ExprNodeEvaluator[] keyFields;
+  private transient ObjectInspector[] keyObjectInspectors;
 
-  protected transient ExprNodeEvaluator[][] aggregationParameterFields;
-  protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
-  protected transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
-  protected transient Object[][] aggregationParameterObjects;
+  private transient ExprNodeEvaluator[][] aggregationParameterFields;
+  private transient ObjectInspector[][] aggregationParameterObjectInspectors;
+  private transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
+  private transient Object[][] aggregationParameterObjects;
+  
   // so aggregationIsDistinct is a boolean array instead of a single number.
-  protected transient boolean[] aggregationIsDistinct;
+  private transient boolean[] aggregationIsDistinct;
   // Map from integer tag to distinct aggrs
-  transient protected Map<Integer, Set<Integer>> distinctKeyAggrs =
+  private transient Map<Integer, Set<Integer>> distinctKeyAggrs =
     new HashMap<Integer, Set<Integer>>();
   // Map from integer tag to non-distinct aggrs with key parameters.
-  transient protected Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
+  private transient Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
     new HashMap<Integer, Set<Integer>>();
   // List of non-distinct aggrs.
-  transient protected List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
+  private transient List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
   // Union expr for distinct keys
-  transient ExprNodeEvaluator unionExprEval = null;
+  private transient ExprNodeEvaluator unionExprEval;
 
-  transient GenericUDAFEvaluator[] aggregationEvaluators;
-  transient boolean[] estimableAggregationEvaluators;
-
-  protected transient ArrayList<ObjectInspector> objectInspectors;
-  transient ArrayList<String> fieldNames;
+  private transient GenericUDAFEvaluator[] aggregationEvaluators;
+  private transient boolean[] estimableAggregationEvaluators;
 
   // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
   // MERGEPARTIAL
-  protected transient KeyWrapper currentKeys;
-  protected transient KeyWrapper newKeys;
-  protected transient AggregationBuffer[] aggregations;
-  protected transient Object[][] aggregationsParametersLastInvoke;
+  private transient KeyWrapper currentKeys;
+  private transient KeyWrapper newKeys;
+  private transient AggregationBuffer[] aggregations;
+  private transient Object[][] aggregationsParametersLastInvoke;
 
   // Used by hash-based GroupBy: Mode = HASH, PARTIALS
-  protected transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
+  private transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
 
   // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
-  protected transient HashSet<KeyWrapper> keysCurrentGroup;
+  private transient HashSet<KeyWrapper> keysCurrentGroup;
 
-  transient boolean firstRow;
-  transient long totalMemory;
-  protected transient boolean hashAggr;
+  private transient boolean firstRow;
+  private transient boolean hashAggr;
   // The reduction is happening on the reducer, and the grouping key and
   // reduction keys are different.
   // For example: select a, count(distinct b) from T group by a
   // The data is sprayed by 'b' and the reducer is grouping it by 'a'
-  transient boolean groupKeyIsNotReduceKey;
-  transient boolean firstRowInGroup;
-  transient long numRowsInput;
-  transient long numRowsHashTbl;
-  transient int groupbyMapAggrInterval;
-  transient long numRowsCompareHashAggr;
-  transient float minReductionHashAggr;
+  private transient boolean groupKeyIsNotReduceKey;
+  private transient boolean firstRowInGroup;
+  private transient long numRowsInput;
+  private transient long numRowsHashTbl;
+  private transient int groupbyMapAggrInterval;
+  private transient long numRowsCompareHashAggr;
+  private transient float minReductionHashAggr;
 
-  // current Key ObjectInspectors are standard ObjectInspectors
-  protected transient ObjectInspector[] currentKeyObjectInspectors;
-  // new Key ObjectInspectors are objectInspectors from the parent
-  transient StructObjectInspector newKeyObjectInspector;
-  transient StructObjectInspector currentKeyObjectInspector;
-  public static MemoryMXBean memoryMXBean;
+  private transient int outputKeyLength;
 
-  /**
-   * Total amount of memory allowed for JVM heap.
-   */
-  protected long maxMemory;
+  // current Key ObjectInspectors are standard ObjectInspectors
+  private transient ObjectInspector[] currentKeyObjectInspectors;
 
-  /**
-   * configure percent of memory threshold usable by QP.
-   */
-  protected float memoryThreshold;
+  private transient MemoryMXBean memoryMXBean;
 
-  private boolean groupingSetsPresent;
-  private int groupingSetsPosition;
-  private List<Integer> groupingSets;
-  private List<FastBitSet> groupingSetsBitSet;
-  transient private List<Object> newKeysGroupingSets;
+  private transient boolean groupingSetsPresent;      // generates grouping set
+  private transient int groupingSetsPosition;         // position of grouping set, generally the last of keys
+  private transient List<Integer> groupingSets;       // declared grouping set values  
+  private transient FastBitSet[] groupingSetsBitSet;  // bitsets acquired from grouping set values 
+  private transient Text[] newKeysGroupingSets;
 
   // for these positions, some variable primitive type (String) is used, so size
   // cannot be estimated. sample it at runtime.
-  transient List<Integer> keyPositionsSize;
+  private transient List<Integer> keyPositionsSize;
 
   // for these positions, some variable primitive type (String) is used for the
   // aggregation classes
-  transient List<Field>[] aggrPositions;
+  private transient List<Field>[] aggrPositions;
+
+  private transient int fixedRowSize;
+
+  private transient int totalVariableSize;
+  private transient int numEntriesVarSize;
+
+  private transient int countAfterReport;   // report or forward
+  private transient int heartbeatInterval;
 
-  transient int fixedRowSize;
+  /**
+   * Total amount of memory allowed for JVM heap.
+   */
+  protected transient long maxMemory;
 
   /**
    * Max memory usable by the hashtable before it should flush.
    */
   protected transient long maxHashTblMemory;
-  transient int totalVariableSize;
-  transient int numEntriesVarSize;
+
+  /**
+   * configure percent of memory threshold usable by QP.
+   */
+  protected transient float memoryThreshold;
 
   /**
    * Current number of entries in the hash table.
    */
   protected transient int numEntriesHashTable;
-  transient int countAfterReport;   // report or forward
-  transient int heartbeatInterval;
 
   public static FastBitSet groupingSet2BitSet(int value) {
     FastBitSet bits = new FastBitSet();
@@ -197,7 +188,6 @@ public class GroupByOperator extends Ope
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
-    totalMemory = Runtime.getRuntime().totalMemory();
     numRowsInput = 0;
     numRowsHashTbl = 0;
 
@@ -226,16 +216,15 @@ public class GroupByOperator extends Ope
     if (groupingSetsPresent) {
       groupingSets = conf.getListGroupingSets();
       groupingSetsPosition = conf.getGroupingSetPosition();
-      newKeysGroupingSets = new ArrayList<Object>();
-      groupingSetsBitSet = new ArrayList<FastBitSet>();
+      newKeysGroupingSets = new Text[groupingSets.size()];
+      groupingSetsBitSet = new FastBitSet[groupingSets.size()];
 
+      int pos = 0;
       for (Integer groupingSet: groupingSets) {
         // Create the mapping corresponding to the grouping set
-        ExprNodeEvaluator groupingSetValueEvaluator =
-          ExprNodeEvaluatorFactory.get(new ExprNodeConstantDesc(String.valueOf(groupingSet)));
-
-        newKeysGroupingSets.add(groupingSetValueEvaluator.evaluate(null));
-        groupingSetsBitSet.add(groupingSet2BitSet(groupingSet));
+        newKeysGroupingSets[pos] = new Text(String.valueOf(groupingSet));
+        groupingSetsBitSet[pos] = groupingSet2BitSet(groupingSet);
+        pos++;
       }
     }
 
@@ -348,23 +337,12 @@ public class GroupByOperator extends Ope
       aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
     }
 
-    // init objectInspectors
-    int totalFields = keyFields.length + aggregationEvaluators.length;
-    objectInspectors = new ArrayList<ObjectInspector>(totalFields);
-    for (ExprNodeEvaluator keyField : keyFields) {
-      objectInspectors.add(null);
-    }
     MapredContext context = MapredContext.get();
     if (context != null) {
       for (GenericUDAFEvaluator genericUDAFEvaluator : aggregationEvaluators) {
         context.setup(genericUDAFEvaluator);
       }
     }
-    for (int i = 0; i < aggregationEvaluators.length; i++) {
-      ObjectInspector roi = aggregationEvaluators[i].init(conf.getAggregators()
-          .get(i).getMode(), aggregationParameterObjectInspectors[i]);
-      objectInspectors.add(roi);
-    }
 
     aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
     if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) &&
@@ -390,26 +368,25 @@ public class GroupByOperator extends Ope
       }
     }
 
-    fieldNames = conf.getOutputColumnNames();
+    List<String> fieldNames = new ArrayList<String>(conf.getOutputColumnNames());
 
-    for (int i = 0; i < keyFields.length; i++) {
-      objectInspectors.set(i, currentKeyObjectInspectors[i]);
-    }
+    // grouping id should be pruned, which is the last of key columns
+    // see ColumnPrunerGroupByProc
+    outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length;
 
-    // Generate key names
-    ArrayList<String> keyNames = new ArrayList<String>(keyFields.length);
-    for (int i = 0; i < keyFields.length; i++) {
-      keyNames.add(fieldNames.get(i));
-    }
-    newKeyObjectInspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(keyNames, Arrays
-        .asList(keyObjectInspectors));
-    currentKeyObjectInspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(keyNames, Arrays
-        .asList(currentKeyObjectInspectors));
+    // init objectInspectors
+    ObjectInspector[] objectInspectors = 
+        new ObjectInspector[outputKeyLength + aggregationEvaluators.length];
+    for (int i = 0; i < outputKeyLength; i++) {
+      objectInspectors[i] = currentKeyObjectInspectors[i];
+    }
+    for (int i = 0; i < aggregationEvaluators.length; i++) {
+      objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators()
+          .get(i).getMode(), aggregationParameterObjectInspectors[i]);
+    }
 
     outputObjInspector = ObjectInspectorFactory
-        .getStandardStructObjectInspector(fieldNames, objectInspectors);
+        .getStandardStructObjectInspector(fieldNames, Arrays.asList(objectInspectors));
 
     KeyWrapperFactory keyWrapperFactory =
       new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors);
@@ -769,7 +746,7 @@ public class GroupByOperator extends Ope
           flushHashTable(true);
           hashAggr = false;
         } else {
-          if (isTraceEnabled) {
+          if (isLogTraceEnabled) {
             LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
                 + " #total = " + numRowsInput + " reduction = " + 1.0
                 * (numRowsHashTbl / numRowsInput) + " minReduction = "
@@ -795,14 +772,14 @@ public class GroupByOperator extends Ope
             newKeysArray[keyPos] = null;
           }
 
-          FastBitSet bitset = groupingSetsBitSet.get(groupingSetPos);
+          FastBitSet bitset = groupingSetsBitSet[groupingSetPos];
           // Some keys need to be left to null corresponding to that grouping set.
           for (int keyPos = bitset.nextSetBit(0); keyPos >= 0;
             keyPos = bitset.nextSetBit(keyPos+1)) {
             newKeysArray[keyPos] = cloneNewKeysArray[keyPos];
           }
 
-          newKeysArray[groupingSetsPosition] = newKeysGroupingSets.get(groupingSetPos);
+          newKeysArray[groupingSetsPosition] = newKeysGroupingSets[groupingSetPos];
           processKey(row, rowInspector);
         }
       } else {
@@ -972,7 +949,7 @@ public class GroupByOperator extends Ope
       // Update the number of entries that can fit in the hash table
       numEntriesHashTable =
           (int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize)));
-      if (isTraceEnabled) {
+      if (isLogTraceEnabled) {
         LOG.trace("Hash Aggr: #hash table = " + numEntries
             + " #max in hash table = " + numEntriesHashTable);
       }
@@ -1054,19 +1031,17 @@ public class GroupByOperator extends Ope
    *          The keys in the record
    * @throws HiveException
    */
-  protected void forward(Object[] keys,
-      AggregationBuffer[] aggs) throws HiveException {
+  private void forward(Object[] keys, AggregationBuffer[] aggs) throws HiveException {
 
-    int totalFields = keys.length + aggs.length;
     if (forwardCache == null) {
-      forwardCache = new Object[totalFields];
+      forwardCache = new Object[outputKeyLength + aggs.length];
     }
 
-    for (int i = 0; i < keys.length; i++) {
+    for (int i = 0; i < outputKeyLength; i++) {
       forwardCache[i] = keys[i];
     }
     for (int i = 0; i < aggs.length; i++) {
-      forwardCache[keys.length + i] = aggregationEvaluators[i].evaluate(aggs[i]);
+      forwardCache[outputKeyLength + i] = aggregationEvaluators[i].evaluate(aggs[i]);
     }
 
     forward(forwardCache, outputObjInspector);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Jan 22 05:05:05 2015
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -58,6 +59,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
@@ -99,7 +102,7 @@ public class MoveTask extends Task<MoveW
         if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
           deletePath = createTargetPath(targetPath, fs);
         }
-        if (!Hive.renameFile(conf, sourcePath, targetPath, fs, true, false)) {
+        if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) {
           try {
             if (deletePath != null) {
               fs.delete(deletePath, true);
@@ -158,8 +161,14 @@ public class MoveTask extends Task<MoveW
         actualPath = actualPath.getParent();
       }
       fs.mkdirs(mkDirPath);
+      HadoopShims shims = ShimLoader.getHadoopShims();
       if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
-        fs.setPermission(mkDirPath, fs.getFileStatus(actualPath).getPermission());
+        try {
+          HadoopShims.HdfsFileStatus status = shims.getFullFileStatus(conf, fs, actualPath);
+          shims.setFullFileStatus(conf, status, fs, actualPath);
+        } catch (Exception e) {
+          LOG.warn("Error setting permissions or group of " + actualPath, e);
+        }
       }
     }
     return deletePath;
@@ -259,7 +268,7 @@ public class MoveTask extends Task<MoveW
             dirs = srcFs.globStatus(tbd.getSourcePath());
             files = new ArrayList<FileStatus>();
             for (int i = 0; (dirs != null && i < dirs.length); i++) {
-              files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath())));
+              files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
               // We only check one file, so exit the loop when we have at least
               // one.
               if (files.size() > 0) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Jan 22 05:05:05 2015
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 /**
  * Select operator implementation.
@@ -55,12 +54,12 @@ public class SelectOperator extends Oper
     for (int i = 0; i < colList.size(); i++) {
       assert (colList.get(i) != null);
       eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
-      if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
-        eval[i] = ExprNodeEvaluatorFactory.toCachedEval(eval[i]);
-      }
+    }
+    if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
+      eval = ExprNodeEvaluatorFactory.toCachedEvals(eval);
     }
     output = new Object[eval.length];
-    LOG.info("SELECT " + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+    LOG.info("SELECT " + inputObjInspectors[0].getTypeName());
     outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(),
         inputObjInspectors[0]);
     initializeChildren(hconf);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Jan 22 05:05:05 2015
@@ -93,6 +93,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -1068,7 +1069,6 @@ public final class Utilities {
       removeField(kryo, Operator.class, "colExprMap");
       removeField(kryo, ColumnInfo.class, "objectInspector");
       removeField(kryo, MapWork.class, "opParseCtxMap");
-      removeField(kryo, MapWork.class, "joinTree");
       return kryo;
     };
   };
@@ -1803,7 +1803,7 @@ public final class Utilities {
    */
   public static FileStatus[] listStatusIfExists(Path path, FileSystem fs) throws IOException {
     try {
-      return fs.listStatus(path);
+      return fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
     } catch (FileNotFoundException e) {
       // FS in hadoop 2.0 throws FNF instead of returning null
       return null;
@@ -2639,7 +2639,7 @@ public final class Utilities {
     FileSystem inpFs = dirPath.getFileSystem(job);
 
     if (inpFs.exists(dirPath)) {
-      FileStatus[] fStats = inpFs.listStatus(dirPath);
+      FileStatus[] fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
       if (fStats.length > 0) {
         return false;
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Thu Jan 22 05:05:05 2015
@@ -58,8 +58,6 @@ public class HashMapWrapper extends Abst
   private static final float LOADFACTOR = 0.75f;
   private final HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
   private MapJoinKey lastKey = null;
-  private final boolean useLazyRows;
-  private final boolean useOptimizedKeys;
   private Output output = new Output(0); // Reusable output for serialization
 
   public HashMapWrapper(Map<String, String> metaData) {
@@ -67,30 +65,24 @@ public class HashMapWrapper extends Abst
     int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
     float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
     mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
-    useLazyRows = useOptimizedKeys = false;
   }
 
   public HashMapWrapper() {
     this(HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT.defaultFloatVal,
         HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD.defaultIntVal,
-        HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, false, false, -1);
+        HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, -1);
   }
 
   public HashMapWrapper(Configuration hconf, long keyCount) {
     this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
-        HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
-        HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE),
-        HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS), keyCount);
+        HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), keyCount);
   }
 
-  private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor,
-      boolean useLazyRows, boolean useOptimizedKeys, long keyCount) {
+  private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor, long keyCount) {
     super(createConstructorMetaData(threshold, loadFactor));
     threshold = calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
     mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
-    this.useLazyRows = useLazyRows;
-    this.useOptimizedKeys = useOptimizedKeys;
   }
 
   public static int calculateTableSize(
@@ -131,21 +123,14 @@ public class HashMapWrapper extends Abst
   public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey,
       MapJoinObjectSerDeContext valueContext, Writable currentValue)
           throws SerDeException, HiveException {
-    // We pass key in as reference, to find out quickly if optimized keys can be used.
-    // However, we do not reuse the object since we are putting them into the hashmap.
-    // Later, we don't create optimized keys in MapJoin if hash map doesn't have optimized keys.
-    if (lastKey == null && !useOptimizedKeys) {
-      lastKey = new MapJoinKeyObject();
-    }
-
-    lastKey = MapJoinKey.read(output, lastKey, keyContext, currentKey, false);
-    LazyFlatRowContainer values = (LazyFlatRowContainer)get(lastKey);
+    MapJoinKey key = MapJoinKey.read(output, keyContext, currentKey);
+    FlatRowContainer values = (FlatRowContainer)get(key);
     if (values == null) {
-      values = new LazyFlatRowContainer();
-      put(lastKey, values);
+      values = new FlatRowContainer();
+      put(key, values);
     }
-    values.add(valueContext, (BytesWritable)currentValue, useLazyRows);
-    return lastKey;
+    values.add(valueContext, (BytesWritable)currentValue);
+    return key;
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Thu Jan 22 05:05:05 2015
@@ -57,22 +57,11 @@ public abstract class MapJoinKey {
   public abstract boolean hasAnyNulls(int fieldCount, boolean[] nullsafes);
 
   @SuppressWarnings("deprecation")
-  public static MapJoinKey read(Output output, MapJoinKey key,
-      MapJoinObjectSerDeContext context, Writable writable, boolean mayReuseKey)
-      throws SerDeException, HiveException {
+  public static MapJoinKey read(Output output, MapJoinObjectSerDeContext context,
+      Writable writable) throws SerDeException, HiveException {
     SerDe serde = context.getSerDe();
     Object obj = serde.deserialize(writable);
-    boolean useOptimized = useOptimizedKeyBasedOnPrev(key);
-    if (useOptimized || key == null) {
-      byte[] structBytes = serialize(output, obj, serde.getObjectInspector(), !useOptimized);
-      if (structBytes != null) {
-        return MapJoinKeyBytes.fromBytes(key, mayReuseKey, structBytes);
-      } else if (useOptimized) {
-        throw new SerDeException(
-            "Failed to serialize " + obj + " even though optimized keys are used");
-      }
-    }
-    MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject();
+    MapJoinKeyObject result = new MapJoinKeyObject();
     result.read(serde.getObjectInspector(), obj);
     return result;
   }
@@ -98,35 +87,6 @@ public abstract class MapJoinKey {
     SUPPORTED_PRIMITIVES.add(PrimitiveCategory.CHAR);
   }
 
-  private static byte[] serialize(Output byteStream,
-      Object obj, ObjectInspector oi, boolean checkTypes) throws HiveException {
-    if (null == obj || !(oi instanceof StructObjectInspector)) {
-      return null; // not supported
-    }
-    StructObjectInspector soi = (StructObjectInspector)oi;
-    List<? extends StructField> fields = soi.getAllStructFieldRefs();
-    int size = fields.size();
-    if (size > 8) {
-      return null; // not supported
-    } else if (size == 0) {
-      return EMPTY_BYTE_ARRAY; // shortcut for null keys
-    }
-    Object[] fieldData = new Object[size];
-    List<ObjectInspector> fieldOis = new ArrayList<ObjectInspector>(size);
-    for (int i = 0; i < size; ++i) {
-      StructField field = fields.get(i);
-      ObjectInspector foi = field.getFieldObjectInspector();
-      if (checkTypes && !isSupportedField(foi)) {
-        return null;
-      }
-      fieldData[i] = soi.getStructFieldData(obj, field);
-      fieldOis.add(foi);
-    }
-
-    byteStream = serializeRow(byteStream, fieldData, fieldOis, null);
-    return Arrays.copyOf(byteStream.getData(), byteStream.getLength());
-  }
-
   public static boolean isSupportedField(ObjectInspector foi) {
     if (foi.getCategory() != Category.PRIMITIVE) return false; // not supported
     PrimitiveCategory pc = ((PrimitiveObjectInspector)foi).getPrimitiveCategory();
@@ -136,19 +96,6 @@ public abstract class MapJoinKey {
 
   public static MapJoinKey readFromVector(Output output, MapJoinKey key, Object[] keyObject,
       List<ObjectInspector> keyOIs, boolean mayReuseKey) throws HiveException {
-    boolean useOptimized = useOptimizedKeyBasedOnPrev(key);
-    if (useOptimized || key == null) {
-      if (keyObject.length <= 8) {
-        output = serializeRow(output, keyObject, keyOIs, null);
-        return MapJoinKeyBytes.fromBytes(key, mayReuseKey,
-            Arrays.copyOf(output.getData(), output.getLength()));
-      }
-      if (useOptimized) {
-        throw new HiveException(
-            "Failed to serialize " + Arrays.toString(keyObject) +
-                " even though optimized keys are used");
-      }
-    }
     MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject();
     result.setKeyObjects(keyObject);
     return result;
@@ -178,32 +125,11 @@ public abstract class MapJoinKey {
 
   public static MapJoinKey readFromRow(Output output, MapJoinKey key, Object[] keyObject,
       List<ObjectInspector> keyFieldsOI, boolean mayReuseKey) throws HiveException {
-    boolean useOptimized = useOptimizedKeyBasedOnPrev(key);
-    if (useOptimized || key == null) {
-      if (keyObject.length <= 8) {
-        byte[] structBytes;
-        if (keyObject.length == 0) {
-          structBytes = EMPTY_BYTE_ARRAY; // shortcut for null keys
-        } else {
-          output = serializeRow(output, keyObject, keyFieldsOI, null);
-          structBytes = Arrays.copyOf(output.getData(), output.getLength());
-        }
-        return MapJoinKeyBytes.fromBytes(key, mayReuseKey, structBytes);
-      }
-      if (useOptimized) {
-        throw new HiveException(
-            "Failed to serialize " + Arrays.toString(keyObject) +
-                " even though optimized keys are used");
-      }
-    }
     MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject();
     result.readFromRow(keyObject, keyFieldsOI);
     return result;
   }
 
-  private static final Log LOG = LogFactory.getLog(MapJoinKey.class);
-
-
   /**
    * Serializes row to output.
    * @param byteStream Output to reuse. Can be null, in that case a new one would be created.
@@ -228,8 +154,4 @@ public abstract class MapJoinKey {
     }
     return byteStream;
   }
-
-  private static boolean useOptimizedKeyBasedOnPrev(MapJoinKey key) {
-    return (key != null) && (key instanceof MapJoinKeyBytes);
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Thu Jan 22 05:05:05 2015
@@ -57,8 +57,6 @@ public class HashTableLoader implements
   private ExecMapperContext context;
   private Configuration hconf;
   private MapJoinDesc desc;
-  private MapJoinKey lastKey = null;
-  private int rowCount = 0;
 
   @Override
   public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
@@ -111,8 +109,7 @@ public class HashTableLoader implements
             : new HashMapWrapper(hconf, keyCount);
 
         while (kvReader.next()) {
-          rowCount++;
-          lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
+          tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
               valCtx, (Writable)kvReader.getCurrentValue());
         }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Thu Jan 22 05:05:05 2015
@@ -70,6 +70,7 @@ public class VectorGroupByOperator exten
    * Key vector expressions.
    */
   private VectorExpression[] keyExpressions;
+  private int outputKeyLength;
 
   private boolean isVectorOutput;
 
@@ -768,9 +769,16 @@ public class VectorGroupByOperator exten
     List<ExprNodeDesc> keysDesc = conf.getKeys();
     try {
 
-      keyOutputWriters = new VectorExpressionWriter[keyExpressions.length];
+      List<String> outputFieldNames = conf.getOutputColumnNames();
+
+      // grouping id should be pruned, which is the last of key columns
+      // see ColumnPrunerGroupByProc
+      outputKeyLength = 
+          conf.pruneGroupingSetId() ? keyExpressions.length - 1 : keyExpressions.length;
+      
+      keyOutputWriters = new VectorExpressionWriter[outputKeyLength];
 
-      for(int i = 0; i < keyExpressions.length; ++i) {
+      for(int i = 0; i < outputKeyLength; ++i) {
         keyOutputWriters[i] = VectorExpressionWriterFactory.
             genVectorExpressionWritable(keysDesc.get(i));
         objectInspectors.add(keyOutputWriters[i].getObjectInspector());
@@ -788,7 +796,6 @@ public class VectorGroupByOperator exten
         aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
       }
       LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput);
-      List<String> outputFieldNames = conf.getOutputColumnNames();
       outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
           outputFieldNames, objectInspectors);
       if (isVectorOutput) {
@@ -807,9 +814,9 @@ public class VectorGroupByOperator exten
 
     initializeChildren(hconf);
 
-    forwardCache = new Object[keyExpressions.length + aggregators.length];
+    forwardCache = new Object[outputKeyLength + aggregators.length];
 
-    if (keyExpressions.length == 0) {
+    if (outputKeyLength == 0) {
         processingMode = this.new ProcessingModeGlobalAggregate();
     } else if (conf.getVectorDesc().isVectorGroupBatches()) {
       // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce).
@@ -872,7 +879,7 @@ public class VectorGroupByOperator exten
     int fi = 0;
     if (!isVectorOutput) {
       // Output row.
-      for (int i = 0; i < keyExpressions.length; ++i) {
+      for (int i = 0; i < outputKeyLength; ++i) {
         forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
             kw, i, keyOutputWriters[i]);
       }
@@ -886,7 +893,7 @@ public class VectorGroupByOperator exten
       forward(forwardCache, outputObjInspector);
     } else {
       // Output keys and aggregates into the output batch.
-      for (int i = 0; i < keyExpressions.length; ++i) {
+      for (int i = 0; i < outputKeyLength; ++i) {
         vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue (
                   kw, i, keyOutputWriters[i]), outputBatch.size);
       }
@@ -910,7 +917,7 @@ public class VectorGroupByOperator exten
    */
   private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer)
       throws HiveException {
-    int fi = keyExpressions.length;   // Start after group keys.
+    int fi = outputKeyLength;   // Start after group keys.
     for (int i = 0; i < aggregators.length; ++i) {
       vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
                 agg.getAggregationBuffer(i)), outputBatch.size);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java Thu Jan 22 05:05:05 2015
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
@@ -98,7 +99,7 @@ public class HiveIndexResult {
         FileSystem fs = indexFilePath.getFileSystem(conf);
         FileStatus indexStat = fs.getFileStatus(indexFilePath);
         if (indexStat.isDir()) {
-          FileStatus[] fss = fs.listStatus(indexFilePath);
+          FileStatus[] fss = fs.listStatus(indexFilePath, FileUtils.HIDDEN_FILES_PATH_FILTER);
           for (FileStatus f : fss) {
             paths.add(f.getPath());
           }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Thu Jan 22 05:05:05 2015
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -92,14 +91,7 @@ public class BucketizedHiveInputFormat<K
     List<IOException> errors = new ArrayList<IOException>();
 
     FileSystem fs = dir.getFileSystem(job);
-    FileStatus[] matches = fs.globStatus(dir, new PathFilter() {
-
-      @Override
-      public boolean accept(Path p) {
-        String name = p.getName();
-        return !name.startsWith("_") && !name.startsWith(".");
-      }
-    });
+    FileStatus[] matches = fs.globStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER);
     if (matches == null) {
       errors.add(new IOException("Input path does not exist: " + dir));
     } else if (matches.length == 0) {
@@ -113,7 +105,8 @@ public class BucketizedHiveInputFormat<K
     if (!errors.isEmpty()) {
       throw new InvalidInputException(errors);
     }
-    LOG.info("Total input paths to process : " + result.size());
+    LOG.debug("Matches for " + dir + ": " + result);
+    LOG.info("Total input paths to process : " + result.size() + " from dir " + dir);
     return result.toArray(new FileStatus[result.size()]);
 
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Thu Jan 22 05:05:05 2015
@@ -30,6 +30,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -74,6 +79,48 @@ public class CombineHiveInputFormat<K ex
   private static final String CLASS_NAME = CombineHiveInputFormat.class.getName();
   public static final Log LOG = LogFactory.getLog(CLASS_NAME);
 
+  // max number of threads we can use to check non-combinable paths
+  private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
+  private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
+
+  private class CheckNonCombinablePathCallable implements Callable<Set<Integer>> {
+    private final Path[] paths;
+    private final int start;
+    private final int length;
+    private final JobConf conf;
+
+    public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
+      this.paths = paths;
+      this.start = start;
+      this.length = length;
+      this.conf = conf;
+    }
+
+    @Override
+    public Set<Integer> call() throws Exception {
+      Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+      for (int i = 0; i < length; i++) {
+        PartitionDesc part =
+            HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+                pathToPartitionInfo, paths[i + start],
+                IOPrepareCache.get().allocatePartitionDescMap());
+        // Use HiveInputFormat if any of the paths is not splittable
+        Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
+        InputFormat<WritableComparable, Writable> inputFormat =
+            getInputFormatFromCache(inputFormatClass, conf);
+        if (inputFormat instanceof AvoidSplitCombination &&
+            ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("The path [" + paths[i + start] +
+                "] is being parked for HiveInputFormat.getSplits");
+          }
+          nonCombinablePathIndices.add(i);
+        }
+      }
+      return nonCombinablePathIndices;
+    }
+  }
+
   /**
    * CombineHiveInputSplit encapsulates an InputSplit with its corresponding
    * inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks
@@ -278,8 +325,6 @@ public class CombineHiveInputFormat<K ex
   private InputSplit[] getCombineSplits(JobConf job, int numSplits,
       Map<String, PartitionDesc> pathToPartitionInfo)
       throws IOException {
-    PerfLogger perfLogger = PerfLogger.getPerfLogger();
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
     init(job);
     Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
     Map<String, Operator<? extends OperatorDesc>> aliasToWork =
@@ -290,7 +335,6 @@ public class CombineHiveInputFormat<K ex
     InputSplit[] splits = null;
     if (combine == null) {
       splits = super.getSplits(job, numSplits);
-      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
       return splits;
     }
 
@@ -349,13 +393,12 @@ public class CombineHiveInputFormat<K ex
           } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
             //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
             splits = super.getSplits(job, numSplits);
-            perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
             return splits;
           }
 
           while (dirs.peek() != null) {
             Path tstPath = dirs.remove();
-            FileStatus[] fStatus = inpFs.listStatus(tstPath);
+            FileStatus[] fStatus = inpFs.listStatus(tstPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
             for (int idx = 0; idx < fStatus.length; idx++) {
               if (fStatus[idx].isDir()) {
                 dirs.offer(fStatus[idx].getPath());
@@ -363,7 +406,6 @@ public class CombineHiveInputFormat<K ex
                   fStatus[idx].getPath()) != null) {
                 //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
                 splits = super.getSplits(job, numSplits);
-                perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
                 return splits;
               }
             }
@@ -373,7 +415,6 @@ public class CombineHiveInputFormat<K ex
       //don't combine if inputformat is a SymlinkTextInputFormat
       if (inputFormat instanceof SymlinkTextInputFormat) {
         splits = super.getSplits(job, numSplits);
-        perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
         return splits;
       }
 
@@ -451,7 +492,6 @@ public class CombineHiveInputFormat<K ex
     }
 
     LOG.info("number of splits " + result.size());
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
@@ -460,6 +500,8 @@ public class CombineHiveInputFormat<K ex
    */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
     init(job);
 
     ArrayList<InputSplit> result = new ArrayList<InputSplit>();
@@ -469,26 +511,37 @@ public class CombineHiveInputFormat<K ex
     List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2);
     List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2);
 
-    for (Path path : paths) {
-
-      PartitionDesc part =
-          HiveFileFormatUtils.getPartitionDescFromPathRecursively(
-              pathToPartitionInfo, path,
-              IOPrepareCache.get().allocatePartitionDescMap());
-
-      // Use HiveInputFormat if any of the paths is not splittable
-      Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
-      InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, job);
-      if (inputFormat instanceof AvoidSplitCombination &&
-          ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The split [" + path +
-              "] is being parked for HiveInputFormat.getSplits");
+    int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
+        (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
+    int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
+    LOG.info("Total number of paths: " + paths.length +
+        ", launching " + numThreads + " threads to check non-combinable ones.");
+    ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
+    try {
+      for (int i = 0; i < numThreads; i++) {
+        int start = i * numPathPerThread;
+        int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
+        futureList.add(executor.submit(
+            new CheckNonCombinablePathCallable(paths, start, length, job)));
+      }
+      Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+      for (Future<Set<Integer>> future : futureList) {
+        nonCombinablePathIndices.addAll(future.get());
+      }
+      for (int i = 0; i < paths.length; i++) {
+        if (nonCombinablePathIndices.contains(i)) {
+          nonCombinablePaths.add(paths[i]);
+        } else {
+          combinablePaths.add(paths[i]);
         }
-        nonCombinablePaths.add(path);
-      } else {
-        combinablePaths.add(path);
       }
+    } catch (Exception e) {
+      LOG.error("Error checking non-combinable path", e);
+      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+      throw new IOException(e);
+    } finally {
+      executor.shutdownNow();
     }
 
     // Store the previous value for the path specification
@@ -528,6 +581,7 @@ public class CombineHiveInputFormat<K ex
       job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths);
     }
     LOG.info("Number of all splits " + result.size());
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new InputSplit[result.size()]);
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java Thu Jan 22 05:05:05 2015
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -58,7 +59,7 @@ public class SymbolicInputFormat impleme
         if (!fStatus.isDir()) {
           symlinks = new FileStatus[] { fStatus };
         } else {
-          symlinks = fileSystem.listStatus(symlinkDir);
+          symlinks = fileSystem.listStatus(symlinkDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
         }
         toRemovePaths.add(path);
         ArrayList<String> aliases = pathToAliases.remove(path);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java Thu Jan 22 05:05:05 2015
@@ -23,19 +23,15 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -192,7 +188,7 @@ public class SymlinkTextInputFormat exte
       List<Path> targetPaths, List<Path> symlinkPaths) throws IOException {
     for (Path symlinkDir : symlinksDirs) {
       FileSystem fileSystem = symlinkDir.getFileSystem(conf);
-      FileStatus[] symlinks = fileSystem.listStatus(symlinkDir);
+      FileStatus[] symlinks = fileSystem.listStatus(symlinkDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
 
       // Read paths from each symlink file.
       for (FileStatus symlink : symlinks) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Thu Jan 22 05:05:05 2015
@@ -254,9 +254,13 @@ class DummyTxnManager extends HiveTxnMan
 
   private HiveLockMode getWriteEntityLockMode (WriteEntity we) {
     HiveLockMode lockMode = we.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED;
-    //but the writeEntity is complete in DDL operations, and we need check its writeType to
-    //to determine the lockMode
-    switch (we.getWriteType()) {
+    //but the writeEntity is complete in DDL operations, instead DDL sets the writeType, so
+    //we use it to determine its lockMode, and first we check if the writeType was set
+    WriteEntity.WriteType writeType = we.getWriteType();
+    if (writeType == null) {
+      return lockMode;
+    }
+    switch (writeType) {
       case DDL_EXCLUSIVE:
         return HiveLockMode.EXCLUSIVE;
       case DDL_SHARED:

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Thu Jan 22 05:05:05 2015
@@ -27,15 +27,10 @@ import org.apache.hadoop.hive.ql.lockmgr
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.metadata.*;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -47,14 +42,11 @@ public class ZooKeeperHiveLockManager im
   public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager");
   static final private LogHelper console = new LogHelper(LOG);
 
-  private ZooKeeper zooKeeper;
+  private static CuratorFramework curatorFramework;
 
   // All the locks are created under this parent
   private String    parent;
 
-  private int sessionTimeout;
-  private String quorumServers;
-
   private long sleepTime;
   private int numRetriesForLock;
   private int numRetriesForUnLock;
@@ -80,8 +72,6 @@ public class ZooKeeperHiveLockManager im
   public void setContext(HiveLockManagerCtx ctx) throws LockException {
     this.ctx = ctx;
     HiveConf conf = ctx.getConf();
-    sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
-    quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
 
     sleepTime = conf.getTimeVar(
         HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
@@ -89,20 +79,18 @@ public class ZooKeeperHiveLockManager im
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
 
     try {
-      renewZookeeperInstance(sessionTimeout, quorumServers);
+      curatorFramework = CuratorFrameworkSingleton.getInstance(conf);
       parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
-
-      try {
-        zooKeeper.create("/" +  parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      } catch (KeeperException e) {
+      try{
+        curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" +  parent, new byte[0]);
+      } catch (Exception e) {
         // ignore if the parent already exists
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
+        if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
           LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e);
         }
       }
-
     } catch (Exception e) {
-      LOG.error("Failed to create ZooKeeper object: ", e);
+      LOG.error("Failed to create curatorFramework object: ", e);
       throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
     }
   }
@@ -116,15 +104,6 @@ public class ZooKeeperHiveLockManager im
     numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
   }
 
-  private void renewZookeeperInstance(int sessionTimeout, String quorumServers)
-      throws InterruptedException, IOException {
-    if (zooKeeper != null) {
-      return;
-    }
-
-    zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher());
-  }
-
   /**
    * @param key    object to be locked
    * Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1,
@@ -266,8 +245,8 @@ public class ZooKeeperHiveLockManager im
    * @throws InterruptedException
    **/
   private String createChild(String name, byte[] data, CreateMode mode)
-      throws KeeperException, InterruptedException {
-    return zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
+      throws Exception {
+    return curatorFramework.create().withMode(mode).forPath(name, data);
   }
 
   private String getLockName(String parent, HiveLockMode mode) {
@@ -347,7 +326,7 @@ public class ZooKeeperHiveLockManager im
   private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
       HiveLockMode mode, boolean keepAlive, boolean parentCreated,
       Set<String> conflictingLocks)
-      throws KeeperException, InterruptedException {
+      throws Exception {
     String res;
 
     // If the parents have already been created, create the last child only
@@ -369,8 +348,8 @@ public class ZooKeeperHiveLockManager im
     for (String name : names) {
       try {
         res = createChild(name, new byte[0], CreateMode.PERSISTENT);
-      } catch (KeeperException e) {
-        if (e.code() != KeeperException.Code.NODEEXISTS) {
+      } catch (Exception e) {
+        if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
           //if the exception is not 'NODEEXISTS', re-throw it
           throw e;
         }
@@ -383,11 +362,11 @@ public class ZooKeeperHiveLockManager im
 
     int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
     if (seqNo == -1) {
-      zooKeeper.delete(res, -1);
+      curatorFramework.delete().forPath(res);
       return null;
     }
 
-    List<String> children = zooKeeper.getChildren(lastName, false);
+    List<String> children = curatorFramework.getChildren().forPath(lastName);
 
     String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE);
     String shLock = getLockName(lastName, HiveLockMode.SHARED);
@@ -407,12 +386,11 @@ public class ZooKeeperHiveLockManager im
 
       if ((childSeq >= 0) && (childSeq < seqNo)) {
         try {
-          zooKeeper.delete(res, -1);
+          curatorFramework.delete().forPath(res);
         } finally {
           if (LOG.isDebugEnabled()) {
-            Stat stat = new Stat();
             try {
-              String data = new String(zooKeeper.getData(child, false, stat));
+              String data = new String(curatorFramework.getData().forPath(child));
               conflictingLocks.add(data);
             } catch (Exception e) {
               //ignored
@@ -428,11 +406,10 @@ public class ZooKeeperHiveLockManager im
 
   /* Remove the lock specified */
   public void unlock(HiveLock hiveLock) throws LockException {
-    unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent);
+    unlockWithRetry(hiveLock, parent);
   }
 
-  private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient,
-      HiveLock hiveLock, String parent) throws LockException {
+  private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException {
 
     int tryNum = 0;
     do {
@@ -440,14 +417,13 @@ public class ZooKeeperHiveLockManager im
         tryNum++;
         if (tryNum > 1) {
           Thread.sleep(sleepTime);
-          prepareRetry();
         }
-        unlockPrimitive(conf, zkpClient, hiveLock, parent);
+        unlockPrimitive(hiveLock, parent, curatorFramework);
         break;
       } catch (Exception e) {
         if (tryNum >= numRetriesForUnLock) {
           String name = ((ZooKeeperHiveLock)hiveLock).getPath();
-          LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");  
+          LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");
           throw new LockException(e);
         }
       }
@@ -458,21 +434,20 @@ public class ZooKeeperHiveLockManager im
 
   /* Remove the lock specified */
   @VisibleForTesting
-  static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
-                             HiveLock hiveLock, String parent) throws LockException {
+  static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException {
     ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
     HiveLockObject obj = zLock.getHiveLockObject();
     String name  = getLastObjectName(parent, obj);
     try {
-      zkpClient.delete(zLock.getPath(), -1);
+      curatorFramework.delete().forPath(zLock.getPath());
 
       // Delete the parent node if all the children have been deleted
-      List<String> children = zkpClient.getChildren(name, false);
+      List<String> children = curatorFramework.getChildren().forPath(name);
       if (children == null || children.isEmpty()) {
-        zkpClient.delete(name, -1);
+        curatorFramework.delete().forPath(name);
       }
     } catch (KeeperException.NoNodeException nne) {
-      //can happen in retrying deleting the zLock after exceptions like InterruptedException 
+      //can happen in retrying deleting the zLock after exceptions like InterruptedException
       //or in a race condition where parent has already been deleted by other process when it
       //is to be deleted. Both cases should not raise error
       LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted.");
@@ -480,7 +455,7 @@ public class ZooKeeperHiveLockManager im
       //can happen in a race condition where another process adds a zLock under this parent
       //just before it is about to be deleted. It should not be a problem since this parent
       //can eventually be deleted by the process which hold its last child zLock
-      LOG.debug("Node " + name + " to be deleted is not empty.");  
+      LOG.debug("Node " + name + " to be deleted is not empty.");
     } catch (Exception e) {
       //exceptions including InterruptException and other KeeperException
       LOG.error("Failed to release ZooKeeper lock: ", e);
@@ -490,19 +465,14 @@ public class ZooKeeperHiveLockManager im
 
   /* Release all locks - including PERSISTENT locks */
   public static void releaseAllLocks(HiveConf conf) throws Exception {
-    ZooKeeper zkpClient = null;
     try {
-      int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
-      String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
-      Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher();
-      zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
       String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
-      List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
+      List<HiveLock> locks = getLocks(conf, null, parent, false, false);
       Exception lastExceptionGot = null;
       if (locks != null) {
         for (HiveLock lock : locks) {
           try {
-            unlockPrimitive(conf, zkpClient, lock, parent);
+            unlockPrimitive(lock, parent, curatorFramework);
           } catch (Exception e) {
             lastExceptionGot = e;
           }
@@ -516,24 +486,19 @@ public class ZooKeeperHiveLockManager im
     } catch (Exception e) {
       LOG.error("Failed to release all locks: ", e);
       throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
-    } finally {
-      if (zkpClient != null) {
-        zkpClient.close();
-        zkpClient = null;
-      }
     }
   }
 
   /* Get all locks */
   public List<HiveLock> getLocks(boolean verifyTablePartition, boolean fetchData)
     throws LockException {
-    return getLocks(ctx.getConf(), zooKeeper, null, parent, verifyTablePartition, fetchData);
+    return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData);
   }
 
   /* Get all locks for a particular object */
   public List<HiveLock> getLocks(HiveLockObject key, boolean verifyTablePartitions,
                                  boolean fetchData) throws LockException {
-    return getLocks(ctx.getConf(), zooKeeper, key, parent, verifyTablePartitions, fetchData);
+    return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData);
   }
 
   /**
@@ -541,7 +506,7 @@ public class ZooKeeperHiveLockManager im
    * @param zkpClient   The ZooKeeper client
    * @param key         The object to be compared against - if key is null, then get all locks
    **/
-  private static List<HiveLock> getLocks(HiveConf conf, ZooKeeper zkpClient,
+  private static List<HiveLock> getLocks(HiveConf conf,
       HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData)
       throws LockException {
     List<HiveLock> locks = new ArrayList<HiveLock>();
@@ -552,12 +517,12 @@ public class ZooKeeperHiveLockManager im
     try {
       if (key != null) {
         commonParent = "/" + parent + "/" + key.getName();
-        children = zkpClient.getChildren(commonParent, false);
+        children = curatorFramework.getChildren().forPath(commonParent);
         recurse = false;
       }
       else {
         commonParent = "/" + parent;
-        children = zkpClient.getChildren(commonParent, false);
+        children = curatorFramework.getChildren().forPath(commonParent);
       }
     } catch (Exception e) {
       // no locks present
@@ -579,7 +544,7 @@ public class ZooKeeperHiveLockManager im
 
       if (recurse) {
         try {
-          children = zkpClient.getChildren(curChild, false);
+          children = curatorFramework.getChildren().forPath(curChild);
           for (String child : children) {
             childn.add(curChild + "/" + child);
           }
@@ -588,7 +553,7 @@ public class ZooKeeperHiveLockManager im
         }
       }
 
-      HiveLockMode mode = getLockMode(conf, curChild);
+      HiveLockMode mode = getLockMode(curChild);
       if (mode == null) {
         continue;
       }
@@ -605,8 +570,7 @@ public class ZooKeeperHiveLockManager im
 
         if (fetchData) {
           try {
-            data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
-                new ZooKeeperHiveHelper.DummyWatcher(), null)));
+            data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild)));
             data.setClientIp(clientIp);
           } catch (Exception e) {
             LOG.error("Error in getting data for " + curChild, e);
@@ -623,12 +587,7 @@ public class ZooKeeperHiveLockManager im
   /** Remove all redundant nodes **/
   private void removeAllRedundantNodes() {
     try {
-      renewZookeeperInstance(sessionTimeout, quorumServers);
       checkRedundantNode("/" + parent);
-      if (zooKeeper != null) {
-        zooKeeper.close();
-        zooKeeper = null;
-      }
     } catch (Exception e) {
       LOG.warn("Exception while removing all redundant nodes", e);
     }
@@ -637,19 +596,19 @@ public class ZooKeeperHiveLockManager im
   private void checkRedundantNode(String node) {
     try {
       // Nothing to do if it is a lock mode
-      if (getLockMode(ctx.getConf(), node) != null) {
+      if (getLockMode(node) != null) {
         return;
       }
 
-      List<String> children = zooKeeper.getChildren(node, false);
+      List<String> children = curatorFramework.getChildren().forPath(node);
       for (String child : children) {
         checkRedundantNode(node + "/" + child);
       }
 
-      children = zooKeeper.getChildren(node, false);
+      children = curatorFramework.getChildren().forPath(node);
       if ((children == null) || (children.isEmpty()))
       {
-        zooKeeper.delete(node, -1);
+        curatorFramework.delete().forPath(node);
       }
     } catch (Exception e) {
       LOG.warn("Error in checkRedundantNode for node " + node, e);
@@ -658,12 +617,7 @@ public class ZooKeeperHiveLockManager im
 
   /* Release all transient locks, by simply closing the client */
   public void close() throws LockException {
-    try {
-
-      if (zooKeeper != null) {
-        zooKeeper.close();
-        zooKeeper = null;
-      }
+  try {
 
       if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) {
         removeAllRedundantNodes();
@@ -750,7 +704,7 @@ public class ZooKeeperHiveLockManager im
   private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$");
 
   /* Get the mode of the lock encoded in the path */
-  private static HiveLockMode getLockMode(HiveConf conf, String path) {
+  private static HiveLockMode getLockMode(String path) {
 
     Matcher shMatcher = shMode.matcher(path);
     Matcher exMatcher = exMode.matcher(path);
@@ -768,15 +722,6 @@ public class ZooKeeperHiveLockManager im
 
   @Override
   public void prepareRetry() throws LockException {
-    try {
-      if (zooKeeper != null && zooKeeper.getState() == ZooKeeper.States.CLOSED) {
-        // Reconnect if the connection is closed.
-        zooKeeper = null;
-      }
-      renewZookeeperInstance(sessionTimeout, quorumServers);
-    } catch (Exception e) {
-      throw new LockException(e);
-    }
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Thu Jan 22 05:05:05 2015
@@ -29,6 +29,7 @@ import static org.apache.hadoop.hive.ser
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -36,6 +37,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -48,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -1352,7 +1355,7 @@ public class Hive {
       }
 
       if (replace) {
-        Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf(),
+        Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
             isSrcLocal);
       } else {
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
@@ -1411,7 +1414,7 @@ private void walkDirTree(FileStatus fSta
   }
 
   /* dfs. */
-  FileStatus[] children = fSys.listStatus(fSta.getPath());
+  FileStatus[] children = fSys.listStatus(fSta.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
   if (children != null) {
     for (FileStatus child : children) {
       walkDirTree(child, fSys, skewedColValueLocationMaps, newPartPath, skewedInfo);
@@ -2187,7 +2190,7 @@ private void constructOneLBLocationMap(F
       boolean grantOption) throws HiveException {
     try {
       return getMSC().grant_role(roleName, userName, principalType, grantor,
-          grantorType, grantOption);
+        grantorType, grantOption);
     } catch (Exception e) {
       throw new HiveException(e);
     }
@@ -2282,13 +2285,7 @@ private void constructOneLBLocationMap(F
       for (FileStatus src : srcs) {
         FileStatus[] items;
         if (src.isDir()) {
-          items = srcFs.listStatus(src.getPath(), new PathFilter() {
-            @Override
-            public boolean accept(Path p) {
-              String name = p.getName();
-              return !name.startsWith("_") && !name.startsWith(".");
-            }
-          });
+          items = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
           Arrays.sort(items);
         } else {
           items = new FileStatus[] {src};
@@ -2308,9 +2305,10 @@ private void constructOneLBLocationMap(F
           }
 
           if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES) &&
+            !HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR).equals(itemSource.getName()) &&
             item.isDir()) {
             throw new HiveException("checkPaths: " + src.getPath()
-                + " has nested directory" + itemSource);
+                + " has nested directory " + itemSource);
           }
           // Strip off the file type, if any so we don't make:
           // 000000_0.gz -> 000000_0.gz_copy_1
@@ -2361,11 +2359,54 @@ private void constructOneLBLocationMap(F
     return false;
   }
 
+  private static boolean isSubDir(Path srcf, Path destf, FileSystem fs, boolean isSrcLocal){
+    if (srcf == null) {
+      LOG.debug("The source path is null for isSubDir method.");
+      return false;
+    }
+
+    String fullF1 = getQualifiedPathWithoutSchemeAndAuthority(srcf, fs);
+    String fullF2 = getQualifiedPathWithoutSchemeAndAuthority(destf, fs);
+
+    boolean isInTest = Boolean.valueOf(HiveConf.getBoolVar(fs.getConf(), ConfVars.HIVE_IN_TEST));
+    // In the automation, the data warehouse is the local file system based.
+    LOG.debug("The source path is " + fullF1 + " and the destination path is " + fullF2);
+    if (isInTest) {
+      return fullF1.startsWith(fullF2);
+    }
+
+    // schema is diff, return false
+    String schemaSrcf = srcf.toUri().getScheme();
+    String schemaDestf = destf.toUri().getScheme();
+
+    // if the schemaDestf is null, it means the destination is not in the local file system
+    if (schemaDestf == null && isSrcLocal) {
+      LOG.debug("The source file is in the local while the dest not.");
+      return false;
+    }
+
+    // If both schema information are provided, they should be the same.
+    if (schemaSrcf != null && schemaDestf != null && !schemaSrcf.equals(schemaDestf)) {
+      LOG.debug("The source path's schema is " + schemaSrcf +
+        " and the destination path's schema is " + schemaDestf + ".");
+      return false;
+    }
+
+    LOG.debug("The source path is " + fullF1 + " and the destination path is " + fullF2);
+    return fullF1.startsWith(fullF2);
+  }
+
+  private static String getQualifiedPathWithoutSchemeAndAuthority(Path srcf, FileSystem fs) {
+    Path currentWorkingDir = fs.getWorkingDirectory();
+    Path path = srcf.makeQualified(srcf.toUri(), currentWorkingDir);
+    return Path.getPathWithoutSchemeAndAuthority(path).toString();
+  }
+
   //it is assumed that parent directory of the destf should already exist when this
   //method is called. when the replace value is true, this method works a little different
   //from mv command if the destf is a directory, it replaces the destf instead of moving under
   //the destf. in this case, the replaced destf still preserves the original destf's permission
-  public static boolean renameFile(HiveConf conf, Path srcf, Path destf,
+  public static boolean moveFile(HiveConf conf, Path srcf, Path destf,
       FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException {
     boolean success = false;
 
@@ -2374,17 +2415,26 @@ private void constructOneLBLocationMap(F
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     HadoopShims shims = ShimLoader.getHadoopShims();
     HadoopShims.HdfsFileStatus destStatus = null;
+    HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
 
+    // If source path is a subdirectory of the destination path:
+    //   ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
+    //   where the staging directory is a subdirectory of the destination directory
+    // (1) Do not delete the dest dir before doing the move operation.
+    // (2) It is assumed that subdir and dir are in same encryption zone.
+    // (3) Move individual files from scr dir to dest dir.
+    boolean destIsSubDir = isSubDir(srcf, destf, fs, isSrcLocal);
     try {
       if (inheritPerms || replace) {
         try{
-          destStatus = shims.getFullFileStatus(conf, fs, destf);
+          destStatus = shims.getFullFileStatus(conf, fs, destf.getParent());
           //if destf is an existing directory:
           //if replace is true, delete followed by rename(mv) is equivalent to replace
           //if replace is false, rename (mv) actually move the src under dest dir
           //if destf is an existing file, rename is actually a replace, and do not need
           // to delete the file first
-          if (replace && destStatus.getFileStatus().isDir()) {
+          if (replace && !destIsSubDir) {
+            LOG.debug("The path " + destf.toString() + " is deleted");
             fs.delete(destf, true);
           }
         } catch (FileNotFoundException ignore) {
@@ -2396,14 +2446,39 @@ private void constructOneLBLocationMap(F
       }
       if (!isSrcLocal) {
         // For NOT local src file, rename the file
-        success = fs.rename(srcf, destf);
+        if (hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
+            && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf))
+        {
+          LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
+          success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
+              true,    // delete source
+              replace, // overwrite destination
+              conf);
+        } else {
+          if (destIsSubDir) {
+            FileStatus[] srcs = fs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
+            for (FileStatus status : srcs) {
+              success = FileUtils.copy(srcf.getFileSystem(conf), status.getPath(), destf.getFileSystem(conf), destf,
+                  true,     // delete source
+                  replace,  // overwrite destination
+                  conf);
+
+              if (!success) {
+                throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf);
+              }
+            }
+          } else {
+            success = fs.rename(srcf, destf);
+          }
+        }
       } else {
         // For local src file, copy to hdfs
         fs.copyFromLocalFile(srcf, destf);
         success = true;
       }
-      LOG.info((replace ? "Replacing src:" : "Renaming src:") + srcf.toString()
-          + ";dest: " + destf.toString()  + ";Status:" + success);
+
+      LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString()
+          + ", dest: " + destf.toString()  + ", Status:" + success);
     } catch (IOException ioe) {
       throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe);
     }
@@ -2470,7 +2545,7 @@ private void constructOneLBLocationMap(F
       try {
         for (List<Path[]> sdpairs : result) {
           for (Path[] sdpair : sdpairs) {
-            if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
+            if (!moveFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
               throw new IOException("Cannot move " + sdpair[0] + " to "
                   + sdpair[1]);
             }
@@ -2563,6 +2638,7 @@ private void constructOneLBLocationMap(F
    * srcf, destf, and tmppath should resident in the same DFS, but the oldPath can be in a
    * different DFS.
    *
+   * @param tablePath path of the table.  Used to identify permission inheritance.
    * @param srcf
    *          Source directory to be renamed to tmppath. It should be a
    *          leaf directory where the final data files reside. However it
@@ -2570,13 +2646,15 @@ private void constructOneLBLocationMap(F
    * @param destf
    *          The directory where the final data needs to go
    * @param oldPath
-   *          The directory where the old data location, need to be cleaned up.
+   *          The directory where the old data location, need to be cleaned up.  Most of time, will be the same
+   *          as destf, unless its across FileSystem boundaries.
    * @param isSrcLocal
    *          If the source directory is LOCAL
    */
-  static protected void replaceFiles(Path srcf, Path destf, Path oldPath,
-      HiveConf conf, boolean isSrcLocal) throws HiveException {
+  protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
+          boolean isSrcLocal) throws HiveException {
     try {
+
       FileSystem destFs = destf.getFileSystem(conf);
       boolean inheritPerms = HiveConf.getBoolVar(conf,
           HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
@@ -2597,15 +2675,24 @@ private void constructOneLBLocationMap(F
       List<List<Path[]>> result = checkPaths(conf, destFs, srcs, srcFs, destf,
           true);
 
+      HadoopShims shims = ShimLoader.getHadoopShims();
       if (oldPath != null) {
         try {
           FileSystem fs2 = oldPath.getFileSystem(conf);
           if (fs2.exists(oldPath)) {
-            FileUtils.trashFilesUnderDir(fs2, oldPath, conf);
+            // Do not delete oldPath if:
+            //  - destf is subdir of oldPath
+            //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2)))
+            if (FileUtils.isSubDir(oldPath, destf, fs2)) {
+              FileUtils.trashFilesUnderDir(fs2, oldPath, conf);
+            }
+            if (inheritPerms) {
+              inheritFromTable(tablePath, destf, conf, destFs);
+            }
           }
         } catch (Exception e) {
           //swallow the exception
-          LOG.warn("Directory " + oldPath.toString() + " canot be removed:" + StringUtils.stringifyException(e));
+          LOG.warn("Directory " + oldPath.toString() + " cannot be removed: " + e, e);
         }
       }
 
@@ -2619,15 +2706,30 @@ private void constructOneLBLocationMap(F
             LOG.warn("Error creating directory " + destf.toString());
           }
           if (inheritPerms && success) {
-            destFs.setPermission(destfp, destFs.getFileStatus(destfp.getParent()).getPermission());
+            inheritFromTable(tablePath, destfp, conf, destFs);
           }
         }
 
-        boolean b = renameFile(conf, srcs[0].getPath(), destf, destFs, true,
-            isSrcLocal);
-        if (!b) {
-          throw new HiveException("Unable to move results from " + srcs[0].getPath()
-              + " to destination directory: " + destf);
+        // Copy/move each file under the source directory to avoid to delete the destination
+        // directory if it is the root of an HDFS encryption zone.
+        for (List<Path[]> sdpairs : result) {
+          for (Path[] sdpair : sdpairs) {
+            Path destParent = sdpair[1].getParent();
+            FileSystem destParentFs = destParent.getFileSystem(conf);
+            if (!destParentFs.isDirectory(destParent)) {
+              boolean success = destFs.mkdirs(destParent);
+              if (!success) {
+                LOG.warn("Error creating directory " + destParent);
+              }
+              if (inheritPerms && success) {
+                inheritFromTable(tablePath, destParent, conf, destFs);
+              }
+            }
+            if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) {
+              throw new IOException("Unable to move file/directory from " + sdpair[0] +
+                  " to " + sdpair[1]);
+            }
+          }
         }
       } else { // srcf is a file or pattern containing wildcards
         if (!destFs.exists(destf)) {
@@ -2636,13 +2738,13 @@ private void constructOneLBLocationMap(F
             LOG.warn("Error creating directory " + destf.toString());
           }
           if (inheritPerms && success) {
-            destFs.setPermission(destf, destFs.getFileStatus(destf.getParent()).getPermission());
+            inheritFromTable(tablePath, destf, conf, destFs);
           }
         }
         // srcs must be a list of files -- ensured by LoadSemanticAnalyzer
         for (List<Path[]> sdpairs : result) {
           for (Path[] sdpair : sdpairs) {
-            if (!renameFile(conf, sdpair[0], sdpair[1], destFs, true,
+            if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true,
                 isSrcLocal)) {
               throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]);
             }
@@ -2654,6 +2756,38 @@ private void constructOneLBLocationMap(F
     }
   }
 
+  /**
+   * This method sets all paths from tablePath to destf (including destf) to have same permission as tablePath.
+   * @param tablePath path of table
+   * @param destf path of table-subdir.
+   * @param conf
+   * @param fs
+   */
+  private static void inheritFromTable(Path tablePath, Path destf, HiveConf conf, FileSystem fs) {
+    if (!FileUtils.isSubDir(destf, tablePath, fs)) {
+      //partition may not be under the parent.
+      return;
+    }
+    HadoopShims shims = ShimLoader.getHadoopShims();
+    //Calculate all the paths from the table dir, to destf
+    //At end of this loop, currPath is table dir, and pathsToSet contain list of all those paths.
+    Path currPath = destf;
+    List<Path> pathsToSet = new LinkedList<Path>();
+    while (!currPath.equals(tablePath)) {
+      pathsToSet.add(currPath);
+      currPath = currPath.getParent();
+    }
+
+    try {
+      HadoopShims.HdfsFileStatus fullFileStatus = shims.getFullFileStatus(conf, fs, currPath);
+      for (Path pathToSet : pathsToSet) {
+        shims.setFullFileStatus(conf, fullFileStatus, fs, pathToSet);
+      }
+    } catch (Exception e) {
+      LOG.warn("Error setting permissions or group of " + destf, e);
+    }
+  }
+
   public static boolean isHadoop1() {
     return ShimLoader.getMajorVersion().startsWith("0.20");
   }