You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/22 06:05:10 UTC
svn commit: r1653769 [5/14] - in /hive/branches/spark: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ data/scripts/ dev-s...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Jan 22 05:05:05 2015
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.reflect.Field;
@@ -34,15 +33,12 @@ import java.util.Set;
import javolution.util.FastBitSet;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -72,115 +68,110 @@ import org.apache.hadoop.io.Text;
/**
* GroupBy operator implementation.
*/
-public class GroupByOperator extends Operator<GroupByDesc> implements
- Serializable {
+public class GroupByOperator extends Operator<GroupByDesc> {
- private static final Log LOG = LogFactory.getLog(GroupByOperator.class
- .getName());
- private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final long serialVersionUID = 1L;
private static final int NUMROWSESTIMATESIZE = 1000;
- protected transient ExprNodeEvaluator[] keyFields;
- protected transient ObjectInspector[] keyObjectInspectors;
+ private transient ExprNodeEvaluator[] keyFields;
+ private transient ObjectInspector[] keyObjectInspectors;
- protected transient ExprNodeEvaluator[][] aggregationParameterFields;
- protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
- protected transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
- protected transient Object[][] aggregationParameterObjects;
+ private transient ExprNodeEvaluator[][] aggregationParameterFields;
+ private transient ObjectInspector[][] aggregationParameterObjectInspectors;
+ private transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
+ private transient Object[][] aggregationParameterObjects;
+
// so aggregationIsDistinct is a boolean array instead of a single number.
- protected transient boolean[] aggregationIsDistinct;
+ private transient boolean[] aggregationIsDistinct;
// Map from integer tag to distinct aggrs
- transient protected Map<Integer, Set<Integer>> distinctKeyAggrs =
+ private transient Map<Integer, Set<Integer>> distinctKeyAggrs =
new HashMap<Integer, Set<Integer>>();
// Map from integer tag to non-distinct aggrs with key parameters.
- transient protected Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
+ private transient Map<Integer, Set<Integer>> nonDistinctKeyAggrs =
new HashMap<Integer, Set<Integer>>();
// List of non-distinct aggrs.
- transient protected List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
+ private transient List<Integer> nonDistinctAggrs = new ArrayList<Integer>();
// Union expr for distinct keys
- transient ExprNodeEvaluator unionExprEval = null;
+ private transient ExprNodeEvaluator unionExprEval;
- transient GenericUDAFEvaluator[] aggregationEvaluators;
- transient boolean[] estimableAggregationEvaluators;
-
- protected transient ArrayList<ObjectInspector> objectInspectors;
- transient ArrayList<String> fieldNames;
+ private transient GenericUDAFEvaluator[] aggregationEvaluators;
+ private transient boolean[] estimableAggregationEvaluators;
// Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2,
// MERGEPARTIAL
- protected transient KeyWrapper currentKeys;
- protected transient KeyWrapper newKeys;
- protected transient AggregationBuffer[] aggregations;
- protected transient Object[][] aggregationsParametersLastInvoke;
+ private transient KeyWrapper currentKeys;
+ private transient KeyWrapper newKeys;
+ private transient AggregationBuffer[] aggregations;
+ private transient Object[][] aggregationsParametersLastInvoke;
// Used by hash-based GroupBy: Mode = HASH, PARTIALS
- protected transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
+ private transient HashMap<KeyWrapper, AggregationBuffer[]> hashAggregations;
// Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
- protected transient HashSet<KeyWrapper> keysCurrentGroup;
+ private transient HashSet<KeyWrapper> keysCurrentGroup;
- transient boolean firstRow;
- transient long totalMemory;
- protected transient boolean hashAggr;
+ private transient boolean firstRow;
+ private transient boolean hashAggr;
// The reduction is happening on the reducer, and the grouping key and
// reduction keys are different.
// For example: select a, count(distinct b) from T group by a
// The data is sprayed by 'b' and the reducer is grouping it by 'a'
- transient boolean groupKeyIsNotReduceKey;
- transient boolean firstRowInGroup;
- transient long numRowsInput;
- transient long numRowsHashTbl;
- transient int groupbyMapAggrInterval;
- transient long numRowsCompareHashAggr;
- transient float minReductionHashAggr;
+ private transient boolean groupKeyIsNotReduceKey;
+ private transient boolean firstRowInGroup;
+ private transient long numRowsInput;
+ private transient long numRowsHashTbl;
+ private transient int groupbyMapAggrInterval;
+ private transient long numRowsCompareHashAggr;
+ private transient float minReductionHashAggr;
- // current Key ObjectInspectors are standard ObjectInspectors
- protected transient ObjectInspector[] currentKeyObjectInspectors;
- // new Key ObjectInspectors are objectInspectors from the parent
- transient StructObjectInspector newKeyObjectInspector;
- transient StructObjectInspector currentKeyObjectInspector;
- public static MemoryMXBean memoryMXBean;
+ private transient int outputKeyLength;
- /**
- * Total amount of memory allowed for JVM heap.
- */
- protected long maxMemory;
+ // current Key ObjectInspectors are standard ObjectInspectors
+ private transient ObjectInspector[] currentKeyObjectInspectors;
- /**
- * configure percent of memory threshold usable by QP.
- */
- protected float memoryThreshold;
+ private transient MemoryMXBean memoryMXBean;
- private boolean groupingSetsPresent;
- private int groupingSetsPosition;
- private List<Integer> groupingSets;
- private List<FastBitSet> groupingSetsBitSet;
- transient private List<Object> newKeysGroupingSets;
+ private transient boolean groupingSetsPresent; // generates grouping set
+ private transient int groupingSetsPosition; // position of grouping set, generally the last of keys
+ private transient List<Integer> groupingSets; // declared grouping set values
+ private transient FastBitSet[] groupingSetsBitSet; // bitsets acquired from grouping set values
+ private transient Text[] newKeysGroupingSets;
// for these positions, some variable primitive type (String) is used, so size
// cannot be estimated. sample it at runtime.
- transient List<Integer> keyPositionsSize;
+ private transient List<Integer> keyPositionsSize;
// for these positions, some variable primitive type (String) is used for the
// aggregation classes
- transient List<Field>[] aggrPositions;
+ private transient List<Field>[] aggrPositions;
+
+ private transient int fixedRowSize;
+
+ private transient int totalVariableSize;
+ private transient int numEntriesVarSize;
+
+ private transient int countAfterReport; // report or forward
+ private transient int heartbeatInterval;
- transient int fixedRowSize;
+ /**
+ * Total amount of memory allowed for JVM heap.
+ */
+ protected transient long maxMemory;
/**
* Max memory usable by the hashtable before it should flush.
*/
protected transient long maxHashTblMemory;
- transient int totalVariableSize;
- transient int numEntriesVarSize;
+
+ /**
+ * configure percent of memory threshold usable by QP.
+ */
+ protected transient float memoryThreshold;
/**
* Current number of entries in the hash table.
*/
protected transient int numEntriesHashTable;
- transient int countAfterReport; // report or forward
- transient int heartbeatInterval;
public static FastBitSet groupingSet2BitSet(int value) {
FastBitSet bits = new FastBitSet();
@@ -197,7 +188,6 @@ public class GroupByOperator extends Ope
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
- totalMemory = Runtime.getRuntime().totalMemory();
numRowsInput = 0;
numRowsHashTbl = 0;
@@ -226,16 +216,15 @@ public class GroupByOperator extends Ope
if (groupingSetsPresent) {
groupingSets = conf.getListGroupingSets();
groupingSetsPosition = conf.getGroupingSetPosition();
- newKeysGroupingSets = new ArrayList<Object>();
- groupingSetsBitSet = new ArrayList<FastBitSet>();
+ newKeysGroupingSets = new Text[groupingSets.size()];
+ groupingSetsBitSet = new FastBitSet[groupingSets.size()];
+ int pos = 0;
for (Integer groupingSet: groupingSets) {
// Create the mapping corresponding to the grouping set
- ExprNodeEvaluator groupingSetValueEvaluator =
- ExprNodeEvaluatorFactory.get(new ExprNodeConstantDesc(String.valueOf(groupingSet)));
-
- newKeysGroupingSets.add(groupingSetValueEvaluator.evaluate(null));
- groupingSetsBitSet.add(groupingSet2BitSet(groupingSet));
+ newKeysGroupingSets[pos] = new Text(String.valueOf(groupingSet));
+ groupingSetsBitSet[pos] = groupingSet2BitSet(groupingSet);
+ pos++;
}
}
@@ -348,23 +337,12 @@ public class GroupByOperator extends Ope
aggregationEvaluators[i] = agg.getGenericUDAFEvaluator();
}
- // init objectInspectors
- int totalFields = keyFields.length + aggregationEvaluators.length;
- objectInspectors = new ArrayList<ObjectInspector>(totalFields);
- for (ExprNodeEvaluator keyField : keyFields) {
- objectInspectors.add(null);
- }
MapredContext context = MapredContext.get();
if (context != null) {
for (GenericUDAFEvaluator genericUDAFEvaluator : aggregationEvaluators) {
context.setup(genericUDAFEvaluator);
}
}
- for (int i = 0; i < aggregationEvaluators.length; i++) {
- ObjectInspector roi = aggregationEvaluators[i].init(conf.getAggregators()
- .get(i).getMode(), aggregationParameterObjectInspectors[i]);
- objectInspectors.add(roi);
- }
aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
if ((conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) &&
@@ -390,26 +368,25 @@ public class GroupByOperator extends Ope
}
}
- fieldNames = conf.getOutputColumnNames();
+ List<String> fieldNames = new ArrayList<String>(conf.getOutputColumnNames());
- for (int i = 0; i < keyFields.length; i++) {
- objectInspectors.set(i, currentKeyObjectInspectors[i]);
- }
+ // grouping id should be pruned, which is the last of key columns
+ // see ColumnPrunerGroupByProc
+ outputKeyLength = conf.pruneGroupingSetId() ? keyFields.length - 1 : keyFields.length;
- // Generate key names
- ArrayList<String> keyNames = new ArrayList<String>(keyFields.length);
- for (int i = 0; i < keyFields.length; i++) {
- keyNames.add(fieldNames.get(i));
- }
- newKeyObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(keyNames, Arrays
- .asList(keyObjectInspectors));
- currentKeyObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(keyNames, Arrays
- .asList(currentKeyObjectInspectors));
+ // init objectInspectors
+ ObjectInspector[] objectInspectors =
+ new ObjectInspector[outputKeyLength + aggregationEvaluators.length];
+ for (int i = 0; i < outputKeyLength; i++) {
+ objectInspectors[i] = currentKeyObjectInspectors[i];
+ }
+ for (int i = 0; i < aggregationEvaluators.length; i++) {
+ objectInspectors[outputKeyLength + i] = aggregationEvaluators[i].init(conf.getAggregators()
+ .get(i).getMode(), aggregationParameterObjectInspectors[i]);
+ }
outputObjInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(fieldNames, objectInspectors);
+ .getStandardStructObjectInspector(fieldNames, Arrays.asList(objectInspectors));
KeyWrapperFactory keyWrapperFactory =
new KeyWrapperFactory(keyFields, keyObjectInspectors, currentKeyObjectInspectors);
@@ -769,7 +746,7 @@ public class GroupByOperator extends Ope
flushHashTable(true);
hashAggr = false;
} else {
- if (isTraceEnabled) {
+ if (isLogTraceEnabled) {
LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
+ " #total = " + numRowsInput + " reduction = " + 1.0
* (numRowsHashTbl / numRowsInput) + " minReduction = "
@@ -795,14 +772,14 @@ public class GroupByOperator extends Ope
newKeysArray[keyPos] = null;
}
- FastBitSet bitset = groupingSetsBitSet.get(groupingSetPos);
+ FastBitSet bitset = groupingSetsBitSet[groupingSetPos];
// Some keys need to be left to null corresponding to that grouping set.
for (int keyPos = bitset.nextSetBit(0); keyPos >= 0;
keyPos = bitset.nextSetBit(keyPos+1)) {
newKeysArray[keyPos] = cloneNewKeysArray[keyPos];
}
- newKeysArray[groupingSetsPosition] = newKeysGroupingSets.get(groupingSetPos);
+ newKeysArray[groupingSetsPosition] = newKeysGroupingSets[groupingSetPos];
processKey(row, rowInspector);
}
} else {
@@ -972,7 +949,7 @@ public class GroupByOperator extends Ope
// Update the number of entries that can fit in the hash table
numEntriesHashTable =
(int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize)));
- if (isTraceEnabled) {
+ if (isLogTraceEnabled) {
LOG.trace("Hash Aggr: #hash table = " + numEntries
+ " #max in hash table = " + numEntriesHashTable);
}
@@ -1054,19 +1031,17 @@ public class GroupByOperator extends Ope
* The keys in the record
* @throws HiveException
*/
- protected void forward(Object[] keys,
- AggregationBuffer[] aggs) throws HiveException {
+ private void forward(Object[] keys, AggregationBuffer[] aggs) throws HiveException {
- int totalFields = keys.length + aggs.length;
if (forwardCache == null) {
- forwardCache = new Object[totalFields];
+ forwardCache = new Object[outputKeyLength + aggs.length];
}
- for (int i = 0; i < keys.length; i++) {
+ for (int i = 0; i < outputKeyLength; i++) {
forwardCache[i] = keys[i];
}
for (int i = 0; i < aggs.length; i++) {
- forwardCache[keys.length + i] = aggregationEvaluators[i].evaluate(aggs[i]);
+ forwardCache[outputKeyLength + i] = aggregationEvaluators[i].evaluate(aggs[i]);
}
forward(forwardCache, outputObjInspector);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Jan 22 05:05:05 2015
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -58,6 +59,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
@@ -99,7 +102,7 @@ public class MoveTask extends Task<MoveW
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
deletePath = createTargetPath(targetPath, fs);
}
- if (!Hive.renameFile(conf, sourcePath, targetPath, fs, true, false)) {
+ if (!Hive.moveFile(conf, sourcePath, targetPath, fs, true, false)) {
try {
if (deletePath != null) {
fs.delete(deletePath, true);
@@ -158,8 +161,14 @@ public class MoveTask extends Task<MoveW
actualPath = actualPath.getParent();
}
fs.mkdirs(mkDirPath);
+ HadoopShims shims = ShimLoader.getHadoopShims();
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)) {
- fs.setPermission(mkDirPath, fs.getFileStatus(actualPath).getPermission());
+ try {
+ HadoopShims.HdfsFileStatus status = shims.getFullFileStatus(conf, fs, actualPath);
+ shims.setFullFileStatus(conf, status, fs, actualPath);
+ } catch (Exception e) {
+ LOG.warn("Error setting permissions or group of " + actualPath, e);
+ }
}
}
return deletePath;
@@ -259,7 +268,7 @@ public class MoveTask extends Task<MoveW
dirs = srcFs.globStatus(tbd.getSourcePath());
files = new ArrayList<FileStatus>();
for (int i = 0; (dirs != null && i < dirs.length); i++) {
- files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath())));
+ files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER)));
// We only check one file, so exit the loop when we have at least
// one.
if (files.size() > 0) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Jan 22 05:05:05 2015
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
* Select operator implementation.
@@ -55,12 +54,12 @@ public class SelectOperator extends Oper
for (int i = 0; i < colList.size(); i++) {
assert (colList.get(i) != null);
eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
- if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
- eval[i] = ExprNodeEvaluatorFactory.toCachedEval(eval[i]);
- }
+ }
+ if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEEXPREVALUATIONCACHE)) {
+ eval = ExprNodeEvaluatorFactory.toCachedEvals(eval);
}
output = new Object[eval.length];
- LOG.info("SELECT " + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
+ LOG.info("SELECT " + inputObjInspectors[0].getTypeName());
outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf.getOutputColumnNames(),
inputObjInspectors[0]);
initializeChildren(hconf);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Jan 22 05:05:05 2015
@@ -93,6 +93,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -1068,7 +1069,6 @@ public final class Utilities {
removeField(kryo, Operator.class, "colExprMap");
removeField(kryo, ColumnInfo.class, "objectInspector");
removeField(kryo, MapWork.class, "opParseCtxMap");
- removeField(kryo, MapWork.class, "joinTree");
return kryo;
};
};
@@ -1803,7 +1803,7 @@ public final class Utilities {
*/
public static FileStatus[] listStatusIfExists(Path path, FileSystem fs) throws IOException {
try {
- return fs.listStatus(path);
+ return fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER);
} catch (FileNotFoundException e) {
// FS in hadoop 2.0 throws FNF instead of returning null
return null;
@@ -2639,7 +2639,7 @@ public final class Utilities {
FileSystem inpFs = dirPath.getFileSystem(job);
if (inpFs.exists(dirPath)) {
- FileStatus[] fStats = inpFs.listStatus(dirPath);
+ FileStatus[] fStats = inpFs.listStatus(dirPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (fStats.length > 0) {
return false;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Thu Jan 22 05:05:05 2015
@@ -58,8 +58,6 @@ public class HashMapWrapper extends Abst
private static final float LOADFACTOR = 0.75f;
private final HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
private MapJoinKey lastKey = null;
- private final boolean useLazyRows;
- private final boolean useOptimizedKeys;
private Output output = new Output(0); // Reusable output for serialization
public HashMapWrapper(Map<String, String> metaData) {
@@ -67,30 +65,24 @@ public class HashMapWrapper extends Abst
int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
- useLazyRows = useOptimizedKeys = false;
}
public HashMapWrapper() {
this(HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT.defaultFloatVal,
HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD.defaultIntVal,
- HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, false, false, -1);
+ HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR.defaultFloatVal, -1);
}
public HashMapWrapper(Configuration hconf, long keyCount) {
this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
- HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
- HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINLAZYHASHTABLE),
- HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDKEYS), keyCount);
+ HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR), keyCount);
}
- private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor,
- boolean useLazyRows, boolean useOptimizedKeys, long keyCount) {
+ private HashMapWrapper(float keyCountAdj, int threshold, float loadFactor, long keyCount) {
super(createConstructorMetaData(threshold, loadFactor));
threshold = calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
- this.useLazyRows = useLazyRows;
- this.useOptimizedKeys = useOptimizedKeys;
}
public static int calculateTableSize(
@@ -131,21 +123,14 @@ public class HashMapWrapper extends Abst
public MapJoinKey putRow(MapJoinObjectSerDeContext keyContext, Writable currentKey,
MapJoinObjectSerDeContext valueContext, Writable currentValue)
throws SerDeException, HiveException {
- // We pass key in as reference, to find out quickly if optimized keys can be used.
- // However, we do not reuse the object since we are putting them into the hashmap.
- // Later, we don't create optimized keys in MapJoin if hash map doesn't have optimized keys.
- if (lastKey == null && !useOptimizedKeys) {
- lastKey = new MapJoinKeyObject();
- }
-
- lastKey = MapJoinKey.read(output, lastKey, keyContext, currentKey, false);
- LazyFlatRowContainer values = (LazyFlatRowContainer)get(lastKey);
+ MapJoinKey key = MapJoinKey.read(output, keyContext, currentKey);
+ FlatRowContainer values = (FlatRowContainer)get(key);
if (values == null) {
- values = new LazyFlatRowContainer();
- put(lastKey, values);
+ values = new FlatRowContainer();
+ put(key, values);
}
- values.add(valueContext, (BytesWritable)currentValue, useLazyRows);
- return lastKey;
+ values.add(valueContext, (BytesWritable)currentValue);
+ return key;
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Thu Jan 22 05:05:05 2015
@@ -57,22 +57,11 @@ public abstract class MapJoinKey {
public abstract boolean hasAnyNulls(int fieldCount, boolean[] nullsafes);
@SuppressWarnings("deprecation")
- public static MapJoinKey read(Output output, MapJoinKey key,
- MapJoinObjectSerDeContext context, Writable writable, boolean mayReuseKey)
- throws SerDeException, HiveException {
+ public static MapJoinKey read(Output output, MapJoinObjectSerDeContext context,
+ Writable writable) throws SerDeException, HiveException {
SerDe serde = context.getSerDe();
Object obj = serde.deserialize(writable);
- boolean useOptimized = useOptimizedKeyBasedOnPrev(key);
- if (useOptimized || key == null) {
- byte[] structBytes = serialize(output, obj, serde.getObjectInspector(), !useOptimized);
- if (structBytes != null) {
- return MapJoinKeyBytes.fromBytes(key, mayReuseKey, structBytes);
- } else if (useOptimized) {
- throw new SerDeException(
- "Failed to serialize " + obj + " even though optimized keys are used");
- }
- }
- MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject();
+ MapJoinKeyObject result = new MapJoinKeyObject();
result.read(serde.getObjectInspector(), obj);
return result;
}
@@ -98,35 +87,6 @@ public abstract class MapJoinKey {
SUPPORTED_PRIMITIVES.add(PrimitiveCategory.CHAR);
}
- private static byte[] serialize(Output byteStream,
- Object obj, ObjectInspector oi, boolean checkTypes) throws HiveException {
- if (null == obj || !(oi instanceof StructObjectInspector)) {
- return null; // not supported
- }
- StructObjectInspector soi = (StructObjectInspector)oi;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
- int size = fields.size();
- if (size > 8) {
- return null; // not supported
- } else if (size == 0) {
- return EMPTY_BYTE_ARRAY; // shortcut for null keys
- }
- Object[] fieldData = new Object[size];
- List<ObjectInspector> fieldOis = new ArrayList<ObjectInspector>(size);
- for (int i = 0; i < size; ++i) {
- StructField field = fields.get(i);
- ObjectInspector foi = field.getFieldObjectInspector();
- if (checkTypes && !isSupportedField(foi)) {
- return null;
- }
- fieldData[i] = soi.getStructFieldData(obj, field);
- fieldOis.add(foi);
- }
-
- byteStream = serializeRow(byteStream, fieldData, fieldOis, null);
- return Arrays.copyOf(byteStream.getData(), byteStream.getLength());
- }
-
public static boolean isSupportedField(ObjectInspector foi) {
if (foi.getCategory() != Category.PRIMITIVE) return false; // not supported
PrimitiveCategory pc = ((PrimitiveObjectInspector)foi).getPrimitiveCategory();
@@ -136,19 +96,6 @@ public abstract class MapJoinKey {
public static MapJoinKey readFromVector(Output output, MapJoinKey key, Object[] keyObject,
List<ObjectInspector> keyOIs, boolean mayReuseKey) throws HiveException {
- boolean useOptimized = useOptimizedKeyBasedOnPrev(key);
- if (useOptimized || key == null) {
- if (keyObject.length <= 8) {
- output = serializeRow(output, keyObject, keyOIs, null);
- return MapJoinKeyBytes.fromBytes(key, mayReuseKey,
- Arrays.copyOf(output.getData(), output.getLength()));
- }
- if (useOptimized) {
- throw new HiveException(
- "Failed to serialize " + Arrays.toString(keyObject) +
- " even though optimized keys are used");
- }
- }
MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject();
result.setKeyObjects(keyObject);
return result;
@@ -178,32 +125,11 @@ public abstract class MapJoinKey {
public static MapJoinKey readFromRow(Output output, MapJoinKey key, Object[] keyObject,
List<ObjectInspector> keyFieldsOI, boolean mayReuseKey) throws HiveException {
- boolean useOptimized = useOptimizedKeyBasedOnPrev(key);
- if (useOptimized || key == null) {
- if (keyObject.length <= 8) {
- byte[] structBytes;
- if (keyObject.length == 0) {
- structBytes = EMPTY_BYTE_ARRAY; // shortcut for null keys
- } else {
- output = serializeRow(output, keyObject, keyFieldsOI, null);
- structBytes = Arrays.copyOf(output.getData(), output.getLength());
- }
- return MapJoinKeyBytes.fromBytes(key, mayReuseKey, structBytes);
- }
- if (useOptimized) {
- throw new HiveException(
- "Failed to serialize " + Arrays.toString(keyObject) +
- " even though optimized keys are used");
- }
- }
MapJoinKeyObject result = mayReuseKey ? (MapJoinKeyObject)key : new MapJoinKeyObject();
result.readFromRow(keyObject, keyFieldsOI);
return result;
}
- private static final Log LOG = LogFactory.getLog(MapJoinKey.class);
-
-
/**
* Serializes row to output.
* @param byteStream Output to reuse. Can be null, in that case a new one would be created.
@@ -228,8 +154,4 @@ public abstract class MapJoinKey {
}
return byteStream;
}
-
- private static boolean useOptimizedKeyBasedOnPrev(MapJoinKey key) {
- return (key != null) && (key instanceof MapJoinKeyBytes);
- }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Thu Jan 22 05:05:05 2015
@@ -57,8 +57,6 @@ public class HashTableLoader implements
private ExecMapperContext context;
private Configuration hconf;
private MapJoinDesc desc;
- private MapJoinKey lastKey = null;
- private int rowCount = 0;
@Override
public void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp) {
@@ -111,8 +109,7 @@ public class HashTableLoader implements
: new HashMapWrapper(hconf, keyCount);
while (kvReader.next()) {
- rowCount++;
- lastKey = tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
+ tableContainer.putRow(keyCtx, (Writable)kvReader.getCurrentKey(),
valCtx, (Writable)kvReader.getCurrentValue());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Thu Jan 22 05:05:05 2015
@@ -70,6 +70,7 @@ public class VectorGroupByOperator exten
* Key vector expressions.
*/
private VectorExpression[] keyExpressions;
+ private int outputKeyLength;
private boolean isVectorOutput;
@@ -768,9 +769,16 @@ public class VectorGroupByOperator exten
List<ExprNodeDesc> keysDesc = conf.getKeys();
try {
- keyOutputWriters = new VectorExpressionWriter[keyExpressions.length];
+ List<String> outputFieldNames = conf.getOutputColumnNames();
+
+ // grouping id should be pruned, which is the last of key columns
+ // see ColumnPrunerGroupByProc
+ outputKeyLength =
+ conf.pruneGroupingSetId() ? keyExpressions.length - 1 : keyExpressions.length;
+
+ keyOutputWriters = new VectorExpressionWriter[outputKeyLength];
- for(int i = 0; i < keyExpressions.length; ++i) {
+ for(int i = 0; i < outputKeyLength; ++i) {
keyOutputWriters[i] = VectorExpressionWriterFactory.
genVectorExpressionWritable(keysDesc.get(i));
objectInspectors.add(keyOutputWriters[i].getObjectInspector());
@@ -788,7 +796,6 @@ public class VectorGroupByOperator exten
aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
}
LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput);
- List<String> outputFieldNames = conf.getOutputColumnNames();
outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
outputFieldNames, objectInspectors);
if (isVectorOutput) {
@@ -807,9 +814,9 @@ public class VectorGroupByOperator exten
initializeChildren(hconf);
- forwardCache = new Object[keyExpressions.length + aggregators.length];
+ forwardCache = new Object[outputKeyLength + aggregators.length];
- if (keyExpressions.length == 0) {
+ if (outputKeyLength == 0) {
processingMode = this.new ProcessingModeGlobalAggregate();
} else if (conf.getVectorDesc().isVectorGroupBatches()) {
// Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce).
@@ -872,7 +879,7 @@ public class VectorGroupByOperator exten
int fi = 0;
if (!isVectorOutput) {
// Output row.
- for (int i = 0; i < keyExpressions.length; ++i) {
+ for (int i = 0; i < outputKeyLength; ++i) {
forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
kw, i, keyOutputWriters[i]);
}
@@ -886,7 +893,7 @@ public class VectorGroupByOperator exten
forward(forwardCache, outputObjInspector);
} else {
// Output keys and aggregates into the output batch.
- for (int i = 0; i < keyExpressions.length; ++i) {
+ for (int i = 0; i < outputKeyLength; ++i) {
vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue (
kw, i, keyOutputWriters[i]), outputBatch.size);
}
@@ -910,7 +917,7 @@ public class VectorGroupByOperator exten
*/
private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer)
throws HiveException {
- int fi = keyExpressions.length; // Start after group keys.
+ int fi = outputKeyLength; // Start after group keys.
for (int i = 0; i < aggregators.length; ++i) {
vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
agg.getAggregationBuffer(i)), outputBatch.size);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/index/HiveIndexResult.java Thu Jan 22 05:05:05 2015
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
@@ -98,7 +99,7 @@ public class HiveIndexResult {
FileSystem fs = indexFilePath.getFileSystem(conf);
FileStatus indexStat = fs.getFileStatus(indexFilePath);
if (indexStat.isDir()) {
- FileStatus[] fss = fs.listStatus(indexFilePath);
+ FileStatus[] fss = fs.listStatus(indexFilePath, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (FileStatus f : fss) {
paths.add(f.getPath());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Thu Jan 22 05:05:05 2015
@@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -92,14 +91,7 @@ public class BucketizedHiveInputFormat<K
List<IOException> errors = new ArrayList<IOException>();
FileSystem fs = dir.getFileSystem(job);
- FileStatus[] matches = fs.globStatus(dir, new PathFilter() {
-
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- });
+ FileStatus[] matches = fs.globStatus(dir, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + dir));
} else if (matches.length == 0) {
@@ -113,7 +105,8 @@ public class BucketizedHiveInputFormat<K
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
- LOG.info("Total input paths to process : " + result.size());
+ LOG.debug("Matches for " + dir + ": " + result);
+ LOG.info("Total input paths to process : " + result.size() + " from dir " + dir);
return result.toArray(new FileStatus[result.size()]);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Thu Jan 22 05:05:05 2015
@@ -30,6 +30,10 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -74,6 +79,48 @@ public class CombineHiveInputFormat<K ex
private static final String CLASS_NAME = CombineHiveInputFormat.class.getName();
public static final Log LOG = LogFactory.getLog(CLASS_NAME);
+ // max number of threads we can use to check non-combinable paths
+ private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
+ private static final int DEFAULT_NUM_PATH_PER_THREAD = 100;
+
+ private class CheckNonCombinablePathCallable implements Callable<Set<Integer>> {
+ private final Path[] paths;
+ private final int start;
+ private final int length;
+ private final JobConf conf;
+
+ public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobConf conf) {
+ this.paths = paths;
+ this.start = start;
+ this.length = length;
+ this.conf = conf;
+ }
+
+ @Override
+ public Set<Integer> call() throws Exception {
+ Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+ for (int i = 0; i < length; i++) {
+ PartitionDesc part =
+ HiveFileFormatUtils.getPartitionDescFromPathRecursively(
+ pathToPartitionInfo, paths[i + start],
+ IOPrepareCache.get().allocatePartitionDescMap());
+ // Use HiveInputFormat if any of the paths is not splittable
+ Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
+ InputFormat<WritableComparable, Writable> inputFormat =
+ getInputFormatFromCache(inputFormatClass, conf);
+ if (inputFormat instanceof AvoidSplitCombination &&
+ ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The path [" + paths[i + start] +
+ "] is being parked for HiveInputFormat.getSplits");
+ }
+ nonCombinablePathIndices.add(i);
+ }
+ }
+ return nonCombinablePathIndices;
+ }
+ }
+
/**
* CombineHiveInputSplit encapsulates an InputSplit with its corresponding
* inputFormatClassName. A CombineHiveInputSplit comprises of multiple chunks
@@ -278,8 +325,6 @@ public class CombineHiveInputFormat<K ex
private InputSplit[] getCombineSplits(JobConf job, int numSplits,
Map<String, PartitionDesc> pathToPartitionInfo)
throws IOException {
- PerfLogger perfLogger = PerfLogger.getPerfLogger();
- perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
init(job);
Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
Map<String, Operator<? extends OperatorDesc>> aliasToWork =
@@ -290,7 +335,6 @@ public class CombineHiveInputFormat<K ex
InputSplit[] splits = null;
if (combine == null) {
splits = super.getSplits(job, numSplits);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return splits;
}
@@ -349,13 +393,12 @@ public class CombineHiveInputFormat<K ex
} else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
//if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
splits = super.getSplits(job, numSplits);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return splits;
}
while (dirs.peek() != null) {
Path tstPath = dirs.remove();
- FileStatus[] fStatus = inpFs.listStatus(tstPath);
+ FileStatus[] fStatus = inpFs.listStatus(tstPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
for (int idx = 0; idx < fStatus.length; idx++) {
if (fStatus[idx].isDir()) {
dirs.offer(fStatus[idx].getPath());
@@ -363,7 +406,6 @@ public class CombineHiveInputFormat<K ex
fStatus[idx].getPath()) != null) {
//if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
splits = super.getSplits(job, numSplits);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return splits;
}
}
@@ -373,7 +415,6 @@ public class CombineHiveInputFormat<K ex
//don't combine if inputformat is a SymlinkTextInputFormat
if (inputFormat instanceof SymlinkTextInputFormat) {
splits = super.getSplits(job, numSplits);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return splits;
}
@@ -451,7 +492,6 @@ public class CombineHiveInputFormat<K ex
}
LOG.info("number of splits " + result.size());
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new CombineHiveInputSplit[result.size()]);
}
@@ -460,6 +500,8 @@ public class CombineHiveInputFormat<K ex
*/
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
init(job);
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
@@ -469,26 +511,37 @@ public class CombineHiveInputFormat<K ex
List<Path> nonCombinablePaths = new ArrayList<Path>(paths.length / 2);
List<Path> combinablePaths = new ArrayList<Path>(paths.length / 2);
- for (Path path : paths) {
-
- PartitionDesc part =
- HiveFileFormatUtils.getPartitionDescFromPathRecursively(
- pathToPartitionInfo, path,
- IOPrepareCache.get().allocatePartitionDescMap());
-
- // Use HiveInputFormat if any of the paths is not splittable
- Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
- InputFormat<WritableComparable, Writable> inputFormat = getInputFormatFromCache(inputFormatClass, job);
- if (inputFormat instanceof AvoidSplitCombination &&
- ((AvoidSplitCombination) inputFormat).shouldSkipCombine(path, job)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The split [" + path +
- "] is being parked for HiveInputFormat.getSplits");
+ int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
+ (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
+ int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
+ LOG.info("Total number of paths: " + paths.length +
+ ", launching " + numThreads + " threads to check non-combinable ones.");
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ List<Future<Set<Integer>>> futureList = new ArrayList<Future<Set<Integer>>>(numThreads);
+ try {
+ for (int i = 0; i < numThreads; i++) {
+ int start = i * numPathPerThread;
+ int length = i != numThreads - 1 ? numPathPerThread : paths.length - start;
+ futureList.add(executor.submit(
+ new CheckNonCombinablePathCallable(paths, start, length, job)));
+ }
+ Set<Integer> nonCombinablePathIndices = new HashSet<Integer>();
+ for (Future<Set<Integer>> future : futureList) {
+ nonCombinablePathIndices.addAll(future.get());
+ }
+ for (int i = 0; i < paths.length; i++) {
+ if (nonCombinablePathIndices.contains(i)) {
+ nonCombinablePaths.add(paths[i]);
+ } else {
+ combinablePaths.add(paths[i]);
}
- nonCombinablePaths.add(path);
- } else {
- combinablePaths.add(path);
}
+ } catch (Exception e) {
+ LOG.error("Error checking non-combinable path", e);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+ throw new IOException(e);
+ } finally {
+ executor.shutdownNow();
}
// Store the previous value for the path specification
@@ -528,6 +581,7 @@ public class CombineHiveInputFormat<K ex
job.set(HiveConf.ConfVars.HADOOPMAPREDINPUTDIR.varname, oldPaths);
}
LOG.info("Number of all splits " + result.size());
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new InputSplit[result.size()]);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java Thu Jan 22 05:05:05 2015
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -58,7 +59,7 @@ public class SymbolicInputFormat impleme
if (!fStatus.isDir()) {
symlinks = new FileStatus[] { fStatus };
} else {
- symlinks = fileSystem.listStatus(symlinkDir);
+ symlinks = fileSystem.listStatus(symlinkDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
}
toRemovePaths.add(path);
ArrayList<String> aliases = pathToAliases.remove(path);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java Thu Jan 22 05:05:05 2015
@@ -23,19 +23,15 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -192,7 +188,7 @@ public class SymlinkTextInputFormat exte
List<Path> targetPaths, List<Path> symlinkPaths) throws IOException {
for (Path symlinkDir : symlinksDirs) {
FileSystem fileSystem = symlinkDir.getFileSystem(conf);
- FileStatus[] symlinks = fileSystem.listStatus(symlinkDir);
+ FileStatus[] symlinks = fileSystem.listStatus(symlinkDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
// Read paths from each symlink file.
for (FileStatus symlink : symlinks) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Thu Jan 22 05:05:05 2015
@@ -254,9 +254,13 @@ class DummyTxnManager extends HiveTxnMan
private HiveLockMode getWriteEntityLockMode (WriteEntity we) {
HiveLockMode lockMode = we.isComplete() ? HiveLockMode.EXCLUSIVE : HiveLockMode.SHARED;
- //but the writeEntity is complete in DDL operations, and we need check its writeType to
- //to determine the lockMode
- switch (we.getWriteType()) {
+ //but the writeEntity is complete in DDL operations, instead DDL sets the writeType, so
+ //we use it to determine its lockMode, and first we check if the writeType was set
+ WriteEntity.WriteType writeType = we.getWriteType();
+ if (writeType == null) {
+ return lockMode;
+ }
+ switch (writeType) {
case DDL_EXCLUSIVE:
return HiveLockMode.EXCLUSIVE;
case DDL_SHARED:
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java Thu Jan 22 05:05:05 2015
@@ -27,15 +27,10 @@ import org.apache.hadoop.hive.ql.lockmgr
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
import org.apache.hadoop.hive.ql.metadata.*;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
-import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -47,14 +42,11 @@ public class ZooKeeperHiveLockManager im
public static final Log LOG = LogFactory.getLog("ZooKeeperHiveLockManager");
static final private LogHelper console = new LogHelper(LOG);
- private ZooKeeper zooKeeper;
+ private static CuratorFramework curatorFramework;
// All the locks are created under this parent
private String parent;
- private int sessionTimeout;
- private String quorumServers;
-
private long sleepTime;
private int numRetriesForLock;
private int numRetriesForUnLock;
@@ -80,8 +72,6 @@ public class ZooKeeperHiveLockManager im
public void setContext(HiveLockManagerCtx ctx) throws LockException {
this.ctx = ctx;
HiveConf conf = ctx.getConf();
- sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
- quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
sleepTime = conf.getTimeVar(
HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
@@ -89,20 +79,18 @@ public class ZooKeeperHiveLockManager im
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
try {
- renewZookeeperInstance(sessionTimeout, quorumServers);
+ curatorFramework = CuratorFrameworkSingleton.getInstance(conf);
parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
-
- try {
- zooKeeper.create("/" + parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException e) {
+ try{
+ curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/" + parent, new byte[0]);
+ } catch (Exception e) {
// ignore if the parent already exists
- if (e.code() != KeeperException.Code.NODEEXISTS) {
+ if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
LOG.warn("Unexpected ZK exception when creating parent node /" + parent, e);
}
}
-
} catch (Exception e) {
- LOG.error("Failed to create ZooKeeper object: ", e);
+ LOG.error("Failed to create curatorFramework object: ", e);
throw new LockException(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
}
}
@@ -116,15 +104,6 @@ public class ZooKeeperHiveLockManager im
numRetriesForUnLock = conf.getIntVar(HiveConf.ConfVars.HIVE_UNLOCK_NUMRETRIES);
}
- private void renewZookeeperInstance(int sessionTimeout, String quorumServers)
- throws InterruptedException, IOException {
- if (zooKeeper != null) {
- return;
- }
-
- zooKeeper = new ZooKeeper(quorumServers, sessionTimeout, new ZooKeeperHiveHelper.DummyWatcher());
- }
-
/**
* @param key object to be locked
* Get the name of the last string. For eg. if you need to lock db/T/ds=1=/hr=1,
@@ -266,8 +245,8 @@ public class ZooKeeperHiveLockManager im
* @throws InterruptedException
**/
private String createChild(String name, byte[] data, CreateMode mode)
- throws KeeperException, InterruptedException {
- return zooKeeper.create(name, data, Ids.OPEN_ACL_UNSAFE, mode);
+ throws Exception {
+ return curatorFramework.create().withMode(mode).forPath(name, data);
}
private String getLockName(String parent, HiveLockMode mode) {
@@ -347,7 +326,7 @@ public class ZooKeeperHiveLockManager im
private ZooKeeperHiveLock lockPrimitive(HiveLockObject key,
HiveLockMode mode, boolean keepAlive, boolean parentCreated,
Set<String> conflictingLocks)
- throws KeeperException, InterruptedException {
+ throws Exception {
String res;
// If the parents have already been created, create the last child only
@@ -369,8 +348,8 @@ public class ZooKeeperHiveLockManager im
for (String name : names) {
try {
res = createChild(name, new byte[0], CreateMode.PERSISTENT);
- } catch (KeeperException e) {
- if (e.code() != KeeperException.Code.NODEEXISTS) {
+ } catch (Exception e) {
+ if (!(e instanceof KeeperException) || ((KeeperException)e).code() != KeeperException.Code.NODEEXISTS) {
//if the exception is not 'NODEEXISTS', re-throw it
throw e;
}
@@ -383,11 +362,11 @@ public class ZooKeeperHiveLockManager im
int seqNo = getSequenceNumber(res, getLockName(lastName, mode));
if (seqNo == -1) {
- zooKeeper.delete(res, -1);
+ curatorFramework.delete().forPath(res);
return null;
}
- List<String> children = zooKeeper.getChildren(lastName, false);
+ List<String> children = curatorFramework.getChildren().forPath(lastName);
String exLock = getLockName(lastName, HiveLockMode.EXCLUSIVE);
String shLock = getLockName(lastName, HiveLockMode.SHARED);
@@ -407,12 +386,11 @@ public class ZooKeeperHiveLockManager im
if ((childSeq >= 0) && (childSeq < seqNo)) {
try {
- zooKeeper.delete(res, -1);
+ curatorFramework.delete().forPath(res);
} finally {
if (LOG.isDebugEnabled()) {
- Stat stat = new Stat();
try {
- String data = new String(zooKeeper.getData(child, false, stat));
+ String data = new String(curatorFramework.getData().forPath(child));
conflictingLocks.add(data);
} catch (Exception e) {
//ignored
@@ -428,11 +406,10 @@ public class ZooKeeperHiveLockManager im
/* Remove the lock specified */
public void unlock(HiveLock hiveLock) throws LockException {
- unlockWithRetry(ctx.getConf(), zooKeeper, hiveLock, parent);
+ unlockWithRetry(hiveLock, parent);
}
- private void unlockWithRetry(HiveConf conf, ZooKeeper zkpClient,
- HiveLock hiveLock, String parent) throws LockException {
+ private void unlockWithRetry(HiveLock hiveLock, String parent) throws LockException {
int tryNum = 0;
do {
@@ -440,14 +417,13 @@ public class ZooKeeperHiveLockManager im
tryNum++;
if (tryNum > 1) {
Thread.sleep(sleepTime);
- prepareRetry();
}
- unlockPrimitive(conf, zkpClient, hiveLock, parent);
+ unlockPrimitive(hiveLock, parent, curatorFramework);
break;
} catch (Exception e) {
if (tryNum >= numRetriesForUnLock) {
String name = ((ZooKeeperHiveLock)hiveLock).getPath();
- LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");
+ LOG.error("Node " + name + " can not be deleted after " + numRetriesForUnLock + " attempts.");
throw new LockException(e);
}
}
@@ -458,21 +434,20 @@ public class ZooKeeperHiveLockManager im
/* Remove the lock specified */
@VisibleForTesting
- static void unlockPrimitive(HiveConf conf, ZooKeeper zkpClient,
- HiveLock hiveLock, String parent) throws LockException {
+ static void unlockPrimitive(HiveLock hiveLock, String parent, CuratorFramework curatorFramework) throws LockException {
ZooKeeperHiveLock zLock = (ZooKeeperHiveLock)hiveLock;
HiveLockObject obj = zLock.getHiveLockObject();
String name = getLastObjectName(parent, obj);
try {
- zkpClient.delete(zLock.getPath(), -1);
+ curatorFramework.delete().forPath(zLock.getPath());
// Delete the parent node if all the children have been deleted
- List<String> children = zkpClient.getChildren(name, false);
+ List<String> children = curatorFramework.getChildren().forPath(name);
if (children == null || children.isEmpty()) {
- zkpClient.delete(name, -1);
+ curatorFramework.delete().forPath(name);
}
} catch (KeeperException.NoNodeException nne) {
- //can happen in retrying deleting the zLock after exceptions like InterruptedException
+ //can happen in retrying deleting the zLock after exceptions like InterruptedException
//or in a race condition where parent has already been deleted by other process when it
//is to be deleted. Both cases should not raise error
LOG.debug("Node " + zLock.getPath() + " or its parent has already been deleted.");
@@ -480,7 +455,7 @@ public class ZooKeeperHiveLockManager im
//can happen in a race condition where another process adds a zLock under this parent
//just before it is about to be deleted. It should not be a problem since this parent
//can eventually be deleted by the process which hold its last child zLock
- LOG.debug("Node " + name + " to be deleted is not empty.");
+ LOG.debug("Node " + name + " to be deleted is not empty.");
} catch (Exception e) {
//exceptions including InterruptException and other KeeperException
LOG.error("Failed to release ZooKeeper lock: ", e);
@@ -490,19 +465,14 @@ public class ZooKeeperHiveLockManager im
/* Release all locks - including PERSISTENT locks */
public static void releaseAllLocks(HiveConf conf) throws Exception {
- ZooKeeper zkpClient = null;
try {
- int sessionTimeout = conf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT);
- String quorumServers = ZooKeeperHiveHelper.getQuorumServers(conf);
- Watcher dummyWatcher = new ZooKeeperHiveHelper.DummyWatcher();
- zkpClient = new ZooKeeper(quorumServers, sessionTimeout, dummyWatcher);
String parent = conf.getVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_NAMESPACE);
- List<HiveLock> locks = getLocks(conf, zkpClient, null, parent, false, false);
+ List<HiveLock> locks = getLocks(conf, null, parent, false, false);
Exception lastExceptionGot = null;
if (locks != null) {
for (HiveLock lock : locks) {
try {
- unlockPrimitive(conf, zkpClient, lock, parent);
+ unlockPrimitive(lock, parent, curatorFramework);
} catch (Exception e) {
lastExceptionGot = e;
}
@@ -516,24 +486,19 @@ public class ZooKeeperHiveLockManager im
} catch (Exception e) {
LOG.error("Failed to release all locks: ", e);
throw new Exception(ErrorMsg.ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED.getMsg());
- } finally {
- if (zkpClient != null) {
- zkpClient.close();
- zkpClient = null;
- }
}
}
/* Get all locks */
public List<HiveLock> getLocks(boolean verifyTablePartition, boolean fetchData)
throws LockException {
- return getLocks(ctx.getConf(), zooKeeper, null, parent, verifyTablePartition, fetchData);
+ return getLocks(ctx.getConf(), null, parent, verifyTablePartition, fetchData);
}
/* Get all locks for a particular object */
public List<HiveLock> getLocks(HiveLockObject key, boolean verifyTablePartitions,
boolean fetchData) throws LockException {
- return getLocks(ctx.getConf(), zooKeeper, key, parent, verifyTablePartitions, fetchData);
+ return getLocks(ctx.getConf(), key, parent, verifyTablePartitions, fetchData);
}
/**
@@ -541,7 +506,7 @@ public class ZooKeeperHiveLockManager im
* @param zkpClient The ZooKeeper client
* @param key The object to be compared against - if key is null, then get all locks
**/
- private static List<HiveLock> getLocks(HiveConf conf, ZooKeeper zkpClient,
+ private static List<HiveLock> getLocks(HiveConf conf,
HiveLockObject key, String parent, boolean verifyTablePartition, boolean fetchData)
throws LockException {
List<HiveLock> locks = new ArrayList<HiveLock>();
@@ -552,12 +517,12 @@ public class ZooKeeperHiveLockManager im
try {
if (key != null) {
commonParent = "/" + parent + "/" + key.getName();
- children = zkpClient.getChildren(commonParent, false);
+ children = curatorFramework.getChildren().forPath(commonParent);
recurse = false;
}
else {
commonParent = "/" + parent;
- children = zkpClient.getChildren(commonParent, false);
+ children = curatorFramework.getChildren().forPath(commonParent);
}
} catch (Exception e) {
// no locks present
@@ -579,7 +544,7 @@ public class ZooKeeperHiveLockManager im
if (recurse) {
try {
- children = zkpClient.getChildren(curChild, false);
+ children = curatorFramework.getChildren().forPath(curChild);
for (String child : children) {
childn.add(curChild + "/" + child);
}
@@ -588,7 +553,7 @@ public class ZooKeeperHiveLockManager im
}
}
- HiveLockMode mode = getLockMode(conf, curChild);
+ HiveLockMode mode = getLockMode(curChild);
if (mode == null) {
continue;
}
@@ -605,8 +570,7 @@ public class ZooKeeperHiveLockManager im
if (fetchData) {
try {
- data = new HiveLockObjectData(new String(zkpClient.getData(curChild,
- new ZooKeeperHiveHelper.DummyWatcher(), null)));
+ data = new HiveLockObjectData(new String(curatorFramework.getData().watched().forPath(curChild)));
data.setClientIp(clientIp);
} catch (Exception e) {
LOG.error("Error in getting data for " + curChild, e);
@@ -623,12 +587,7 @@ public class ZooKeeperHiveLockManager im
/** Remove all redundant nodes **/
private void removeAllRedundantNodes() {
try {
- renewZookeeperInstance(sessionTimeout, quorumServers);
checkRedundantNode("/" + parent);
- if (zooKeeper != null) {
- zooKeeper.close();
- zooKeeper = null;
- }
} catch (Exception e) {
LOG.warn("Exception while removing all redundant nodes", e);
}
@@ -637,19 +596,19 @@ public class ZooKeeperHiveLockManager im
private void checkRedundantNode(String node) {
try {
// Nothing to do if it is a lock mode
- if (getLockMode(ctx.getConf(), node) != null) {
+ if (getLockMode(node) != null) {
return;
}
- List<String> children = zooKeeper.getChildren(node, false);
+ List<String> children = curatorFramework.getChildren().forPath(node);
for (String child : children) {
checkRedundantNode(node + "/" + child);
}
- children = zooKeeper.getChildren(node, false);
+ children = curatorFramework.getChildren().forPath(node);
if ((children == null) || (children.isEmpty()))
{
- zooKeeper.delete(node, -1);
+ curatorFramework.delete().forPath(node);
}
} catch (Exception e) {
LOG.warn("Error in checkRedundantNode for node " + node, e);
@@ -658,12 +617,7 @@ public class ZooKeeperHiveLockManager im
/* Release all transient locks, by simply closing the client */
public void close() throws LockException {
- try {
-
- if (zooKeeper != null) {
- zooKeeper.close();
- zooKeeper = null;
- }
+ try {
if (HiveConf.getBoolVar(ctx.getConf(), HiveConf.ConfVars.HIVE_ZOOKEEPER_CLEAN_EXTRA_NODES)) {
removeAllRedundantNodes();
@@ -750,7 +704,7 @@ public class ZooKeeperHiveLockManager im
private static Pattern exMode = Pattern.compile("^.*-(EXCLUSIVE)-([0-9]+)$");
/* Get the mode of the lock encoded in the path */
- private static HiveLockMode getLockMode(HiveConf conf, String path) {
+ private static HiveLockMode getLockMode(String path) {
Matcher shMatcher = shMode.matcher(path);
Matcher exMatcher = exMode.matcher(path);
@@ -768,15 +722,6 @@ public class ZooKeeperHiveLockManager im
@Override
public void prepareRetry() throws LockException {
- try {
- if (zooKeeper != null && zooKeeper.getState() == ZooKeeper.States.CLOSED) {
- // Reconnect if the connection is closed.
- zooKeeper = null;
- }
- renewZookeeperInstance(sessionTimeout, quorumServers);
- } catch (Exception e) {
- throw new LockException(e);
- }
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1653769&r1=1653768&r2=1653769&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Thu Jan 22 05:05:05 2015
@@ -29,6 +29,7 @@ import static org.apache.hadoop.hive.ser
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -36,6 +37,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -48,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -1352,7 +1355,7 @@ public class Hive {
}
if (replace) {
- Hive.replaceFiles(loadPath, newPartPath, oldPartPath, getConf(),
+ Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
isSrcLocal);
} else {
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
@@ -1411,7 +1414,7 @@ private void walkDirTree(FileStatus fSta
}
/* dfs. */
- FileStatus[] children = fSys.listStatus(fSta.getPath());
+ FileStatus[] children = fSys.listStatus(fSta.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
if (children != null) {
for (FileStatus child : children) {
walkDirTree(child, fSys, skewedColValueLocationMaps, newPartPath, skewedInfo);
@@ -2187,7 +2190,7 @@ private void constructOneLBLocationMap(F
boolean grantOption) throws HiveException {
try {
return getMSC().grant_role(roleName, userName, principalType, grantor,
- grantorType, grantOption);
+ grantorType, grantOption);
} catch (Exception e) {
throw new HiveException(e);
}
@@ -2282,13 +2285,7 @@ private void constructOneLBLocationMap(F
for (FileStatus src : srcs) {
FileStatus[] items;
if (src.isDir()) {
- items = srcFs.listStatus(src.getPath(), new PathFilter() {
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- });
+ items = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
Arrays.sort(items);
} else {
items = new FileStatus[] {src};
@@ -2308,9 +2305,10 @@ private void constructOneLBLocationMap(F
}
if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES) &&
+ !HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR).equals(itemSource.getName()) &&
item.isDir()) {
throw new HiveException("checkPaths: " + src.getPath()
- + " has nested directory" + itemSource);
+ + " has nested directory " + itemSource);
}
// Strip off the file type, if any so we don't make:
// 000000_0.gz -> 000000_0.gz_copy_1
@@ -2361,11 +2359,54 @@ private void constructOneLBLocationMap(F
return false;
}
+ private static boolean isSubDir(Path srcf, Path destf, FileSystem fs, boolean isSrcLocal){
+ if (srcf == null) {
+ LOG.debug("The source path is null for isSubDir method.");
+ return false;
+ }
+
+ String fullF1 = getQualifiedPathWithoutSchemeAndAuthority(srcf, fs);
+ String fullF2 = getQualifiedPathWithoutSchemeAndAuthority(destf, fs);
+
+ boolean isInTest = Boolean.valueOf(HiveConf.getBoolVar(fs.getConf(), ConfVars.HIVE_IN_TEST));
+ // In the automation, the data warehouse is the local file system based.
+ LOG.debug("The source path is " + fullF1 + " and the destination path is " + fullF2);
+ if (isInTest) {
+ return fullF1.startsWith(fullF2);
+ }
+
+ // schema is diff, return false
+ String schemaSrcf = srcf.toUri().getScheme();
+ String schemaDestf = destf.toUri().getScheme();
+
+ // if the schemaDestf is null, it means the destination is not in the local file system
+ if (schemaDestf == null && isSrcLocal) {
+ LOG.debug("The source file is in the local while the dest not.");
+ return false;
+ }
+
+ // If both schema information are provided, they should be the same.
+ if (schemaSrcf != null && schemaDestf != null && !schemaSrcf.equals(schemaDestf)) {
+ LOG.debug("The source path's schema is " + schemaSrcf +
+ " and the destination path's schema is " + schemaDestf + ".");
+ return false;
+ }
+
+ LOG.debug("The source path is " + fullF1 + " and the destination path is " + fullF2);
+ return fullF1.startsWith(fullF2);
+ }
+
+ private static String getQualifiedPathWithoutSchemeAndAuthority(Path srcf, FileSystem fs) {
+ Path currentWorkingDir = fs.getWorkingDirectory();
+ Path path = srcf.makeQualified(srcf.toUri(), currentWorkingDir);
+ return Path.getPathWithoutSchemeAndAuthority(path).toString();
+ }
+
//it is assumed that parent directory of the destf should already exist when this
//method is called. when the replace value is true, this method works a little different
//from mv command if the destf is a directory, it replaces the destf instead of moving under
//the destf. in this case, the replaced destf still preserves the original destf's permission
- public static boolean renameFile(HiveConf conf, Path srcf, Path destf,
+ public static boolean moveFile(HiveConf conf, Path srcf, Path destf,
FileSystem fs, boolean replace, boolean isSrcLocal) throws HiveException {
boolean success = false;
@@ -2374,17 +2415,26 @@ private void constructOneLBLocationMap(F
HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
HadoopShims shims = ShimLoader.getHadoopShims();
HadoopShims.HdfsFileStatus destStatus = null;
+ HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
+ // If source path is a subdirectory of the destination path:
+ // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300;
+ // where the staging directory is a subdirectory of the destination directory
+ // (1) Do not delete the dest dir before doing the move operation.
+ // (2) It is assumed that subdir and dir are in same encryption zone.
+ // (3) Move individual files from scr dir to dest dir.
+ boolean destIsSubDir = isSubDir(srcf, destf, fs, isSrcLocal);
try {
if (inheritPerms || replace) {
try{
- destStatus = shims.getFullFileStatus(conf, fs, destf);
+ destStatus = shims.getFullFileStatus(conf, fs, destf.getParent());
//if destf is an existing directory:
//if replace is true, delete followed by rename(mv) is equivalent to replace
//if replace is false, rename (mv) actually move the src under dest dir
//if destf is an existing file, rename is actually a replace, and do not need
// to delete the file first
- if (replace && destStatus.getFileStatus().isDir()) {
+ if (replace && !destIsSubDir) {
+ LOG.debug("The path " + destf.toString() + " is deleted");
fs.delete(destf, true);
}
} catch (FileNotFoundException ignore) {
@@ -2396,14 +2446,39 @@ private void constructOneLBLocationMap(F
}
if (!isSrcLocal) {
// For NOT local src file, rename the file
- success = fs.rename(srcf, destf);
+ if (hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
+ && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf))
+ {
+ LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
+ success = FileUtils.copy(srcf.getFileSystem(conf), srcf, destf.getFileSystem(conf), destf,
+ true, // delete source
+ replace, // overwrite destination
+ conf);
+ } else {
+ if (destIsSubDir) {
+ FileStatus[] srcs = fs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ for (FileStatus status : srcs) {
+ success = FileUtils.copy(srcf.getFileSystem(conf), status.getPath(), destf.getFileSystem(conf), destf,
+ true, // delete source
+ replace, // overwrite destination
+ conf);
+
+ if (!success) {
+ throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf);
+ }
+ }
+ } else {
+ success = fs.rename(srcf, destf);
+ }
+ }
} else {
// For local src file, copy to hdfs
fs.copyFromLocalFile(srcf, destf);
success = true;
}
- LOG.info((replace ? "Replacing src:" : "Renaming src:") + srcf.toString()
- + ";dest: " + destf.toString() + ";Status:" + success);
+
+ LOG.info((replace ? "Replacing src:" : "Renaming src: ") + srcf.toString()
+ + ", dest: " + destf.toString() + ", Status:" + success);
} catch (IOException ioe) {
throw new HiveException("Unable to move source " + srcf + " to destination " + destf, ioe);
}
@@ -2470,7 +2545,7 @@ private void constructOneLBLocationMap(F
try {
for (List<Path[]> sdpairs : result) {
for (Path[] sdpair : sdpairs) {
- if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
+ if (!moveFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
throw new IOException("Cannot move " + sdpair[0] + " to "
+ sdpair[1]);
}
@@ -2563,6 +2638,7 @@ private void constructOneLBLocationMap(F
* srcf, destf, and tmppath should resident in the same DFS, but the oldPath can be in a
* different DFS.
*
+ * @param tablePath path of the table. Used to identify permission inheritance.
* @param srcf
* Source directory to be renamed to tmppath. It should be a
* leaf directory where the final data files reside. However it
@@ -2570,13 +2646,15 @@ private void constructOneLBLocationMap(F
* @param destf
* The directory where the final data needs to go
* @param oldPath
- * The directory where the old data location, need to be cleaned up.
+ * The directory where the old data location, need to be cleaned up. Most of time, will be the same
+ * as destf, unless its across FileSystem boundaries.
* @param isSrcLocal
* If the source directory is LOCAL
*/
- static protected void replaceFiles(Path srcf, Path destf, Path oldPath,
- HiveConf conf, boolean isSrcLocal) throws HiveException {
+ protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
+ boolean isSrcLocal) throws HiveException {
try {
+
FileSystem destFs = destf.getFileSystem(conf);
boolean inheritPerms = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
@@ -2597,15 +2675,24 @@ private void constructOneLBLocationMap(F
List<List<Path[]>> result = checkPaths(conf, destFs, srcs, srcFs, destf,
true);
+ HadoopShims shims = ShimLoader.getHadoopShims();
if (oldPath != null) {
try {
FileSystem fs2 = oldPath.getFileSystem(conf);
if (fs2.exists(oldPath)) {
- FileUtils.trashFilesUnderDir(fs2, oldPath, conf);
+ // Do not delete oldPath if:
+ // - destf is subdir of oldPath
+ //if ( !(fs2.equals(destf.getFileSystem(conf)) && FileUtils.isSubDir(oldPath, destf, fs2)))
+ if (FileUtils.isSubDir(oldPath, destf, fs2)) {
+ FileUtils.trashFilesUnderDir(fs2, oldPath, conf);
+ }
+ if (inheritPerms) {
+ inheritFromTable(tablePath, destf, conf, destFs);
+ }
}
} catch (Exception e) {
//swallow the exception
- LOG.warn("Directory " + oldPath.toString() + " canot be removed:" + StringUtils.stringifyException(e));
+ LOG.warn("Directory " + oldPath.toString() + " cannot be removed: " + e, e);
}
}
@@ -2619,15 +2706,30 @@ private void constructOneLBLocationMap(F
LOG.warn("Error creating directory " + destf.toString());
}
if (inheritPerms && success) {
- destFs.setPermission(destfp, destFs.getFileStatus(destfp.getParent()).getPermission());
+ inheritFromTable(tablePath, destfp, conf, destFs);
}
}
- boolean b = renameFile(conf, srcs[0].getPath(), destf, destFs, true,
- isSrcLocal);
- if (!b) {
- throw new HiveException("Unable to move results from " + srcs[0].getPath()
- + " to destination directory: " + destf);
+ // Copy/move each file under the source directory to avoid to delete the destination
+ // directory if it is the root of an HDFS encryption zone.
+ for (List<Path[]> sdpairs : result) {
+ for (Path[] sdpair : sdpairs) {
+ Path destParent = sdpair[1].getParent();
+ FileSystem destParentFs = destParent.getFileSystem(conf);
+ if (!destParentFs.isDirectory(destParent)) {
+ boolean success = destFs.mkdirs(destParent);
+ if (!success) {
+ LOG.warn("Error creating directory " + destParent);
+ }
+ if (inheritPerms && success) {
+ inheritFromTable(tablePath, destParent, conf, destFs);
+ }
+ }
+ if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true, isSrcLocal)) {
+ throw new IOException("Unable to move file/directory from " + sdpair[0] +
+ " to " + sdpair[1]);
+ }
+ }
}
} else { // srcf is a file or pattern containing wildcards
if (!destFs.exists(destf)) {
@@ -2636,13 +2738,13 @@ private void constructOneLBLocationMap(F
LOG.warn("Error creating directory " + destf.toString());
}
if (inheritPerms && success) {
- destFs.setPermission(destf, destFs.getFileStatus(destf.getParent()).getPermission());
+ inheritFromTable(tablePath, destf, conf, destFs);
}
}
// srcs must be a list of files -- ensured by LoadSemanticAnalyzer
for (List<Path[]> sdpairs : result) {
for (Path[] sdpair : sdpairs) {
- if (!renameFile(conf, sdpair[0], sdpair[1], destFs, true,
+ if (!moveFile(conf, sdpair[0], sdpair[1], destFs, true,
isSrcLocal)) {
throw new IOException("Error moving: " + sdpair[0] + " into: " + sdpair[1]);
}
@@ -2654,6 +2756,38 @@ private void constructOneLBLocationMap(F
}
}
+ /**
+ * This method sets all paths from tablePath to destf (including destf) to have same permission as tablePath.
+ * @param tablePath path of table
+ * @param destf path of table-subdir.
+ * @param conf
+ * @param fs
+ */
+ private static void inheritFromTable(Path tablePath, Path destf, HiveConf conf, FileSystem fs) {
+ if (!FileUtils.isSubDir(destf, tablePath, fs)) {
+ //partition may not be under the parent.
+ return;
+ }
+ HadoopShims shims = ShimLoader.getHadoopShims();
+ //Calculate all the paths from the table dir, to destf
+ //At end of this loop, currPath is table dir, and pathsToSet contain list of all those paths.
+ Path currPath = destf;
+ List<Path> pathsToSet = new LinkedList<Path>();
+ while (!currPath.equals(tablePath)) {
+ pathsToSet.add(currPath);
+ currPath = currPath.getParent();
+ }
+
+ try {
+ HadoopShims.HdfsFileStatus fullFileStatus = shims.getFullFileStatus(conf, fs, currPath);
+ for (Path pathToSet : pathsToSet) {
+ shims.setFullFileStatus(conf, fullFileStatus, fs, pathToSet);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error setting permissions or group of " + destf, e);
+ }
+ }
+
public static boolean isHadoop1() {
return ShimLoader.getMajorVersion().startsWith("0.20");
}