You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/01 07:34:56 UTC
svn commit: r1509081 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/io/rcfile/merge/
test/org/apache/hadoop/hive/ql/io/
Author: gunther
Date: Thu Aug 1 05:34:55 2013
New Revision: 1509081
URL: http://svn.apache.org/r1509081
Log:
HIVE-4843: Refactoring MapRedTask and ExecDriver for better re-usability and readability (Vikram Dixit K via Gunther Hagleitner)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Thu Aug 1 05:34:55 2013
@@ -563,6 +563,10 @@ public class Context {
pathToCS.put(path, cs);
}
+ public ContentSummary getCS(Path path) {
+ return getCS(path.toString());
+ }
+
public ContentSummary getCS(String path) {
return pathToCS.get(path);
}
@@ -575,7 +579,6 @@ public class Context {
return conf;
}
-
/**
* Given a mapping from paths to objects, localize any MR tmp paths
* @param map mapping from paths to objects
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Aug 1 05:34:55 2013
@@ -63,6 +63,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -108,6 +109,7 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -124,6 +126,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
@@ -131,12 +134,10 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
import org.apache.hadoop.hive.ql.plan.api.Graph;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
@@ -150,9 +151,11 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -429,6 +432,7 @@ public final class Utilities {
return new Expression(dateVal, dateVal.getClass(), "new", args);
}
+ @Override
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
if (oldInstance == null || newInstance == null) {
return false;
@@ -442,6 +446,7 @@ public final class Utilities {
* it is not serialization friendly.
*/
public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+ @Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
Timestamp ts = (Timestamp)oldInstance;
Object[] args = { ts.getNanos() };
@@ -451,21 +456,21 @@ public final class Utilities {
}
public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
- setMapWork(conf, w.getMapWork(), hiveScratchDir);
+ setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
if (w.getReduceWork() != null) {
- setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+ setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
}
}
- public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
- setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+ public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) {
+ return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache);
}
- public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
- setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+ public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) {
+ return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache);
}
- private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
+ private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) {
try {
setPlanPath(conf, hiveScratchDir);
@@ -479,7 +484,7 @@ public final class Utilities {
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
// able to get the plan directly from the cache
- if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
+ if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
// Set up distributed cache
if (!DistributedCache.getSymlink(conf)) {
DistributedCache.createSymlink(conf);
@@ -495,6 +500,8 @@ public final class Utilities {
// Cache the plan in this process
gWorkMap.put(planPath, w);
+
+ return planPath;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -609,6 +616,7 @@ public final class Utilities {
* De-serialize an object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
+ @SuppressWarnings("unchecked")
public static <T> T deserializeObject(InputStream in) {
XMLDecoder d = null;
try {
@@ -1778,7 +1786,7 @@ public final class Utilities {
}
}
- public static Object getInputSummaryLock = new Object();
+ public static Object INPUT_SUMMARY_LOCK = new Object();
/**
* Calculate the total size of input files.
@@ -1801,7 +1809,7 @@ public final class Utilities {
// Since multiple threads could call this method concurrently, locking
// this method will avoid number of threads out of control.
- synchronized (getInputSummaryLock) {
+ synchronized (INPUT_SUMMARY_LOCK) {
// For each input path, calculate the total size.
for (String path : work.getPathToAliases().keySet()) {
Path p = new Path(path);
@@ -1912,7 +1920,7 @@ public final class Utilities {
throw new IOException(e);
}
} while (!executorDone);
- }
+ }
executor.shutdown();
}
HiveInterruptUtils.checkInterrupted();
@@ -1936,7 +1944,7 @@ public final class Utilities {
}
}
- public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
+ public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx)
throws Exception {
ContentSummary cs = ctx.getCS(dirPath);
if (cs != null) {
@@ -1946,8 +1954,7 @@ public final class Utilities {
} else {
LOG.info("Content Summary not cached for " + dirPath);
}
- Path p = new Path(dirPath);
- return isEmptyPath(job, p);
+ return isEmptyPath(job, dirPath);
}
public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception {
@@ -2532,5 +2539,415 @@ public final class Utilities {
return sb.toString();
}
+
+ /**
+ * Estimate the number of reducers needed for this job, based on job input,
+ * and configuration parameters.
+ *
+ * The output of this method should only be used if the output of this
+ * MapRedTask is not being used to populate a bucketed table and the user
+ * has not specified the number of reducers to use.
+ *
+ * @return the number of reducers.
+ */
+ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,
+ MapWork work, boolean finalMapRed) throws IOException {
+ long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+ int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+ double samplePercentage = getHighestSamplePercentage(work);
+ long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);
+
+ // if all inputs are sampled, we should shrink the size of reducers accordingly.
+ if (totalInputFileSize != inputSummary.getLength()) {
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
+ } else {
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+ }
+
+ int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+
+ // If this map reduce job writes final data to a table and bucketing is being inferred,
+ // and the user has configured Hive to do this, make sure the number of reducers is a
+ // power of two
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
+ finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) {
+
+ int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
+ int reducersPowerTwo = (int)Math.pow(2, reducersLog);
+
+ // If the original number of reducers was a power of two, use that
+ if (reducersPowerTwo / 2 == reducers) {
+ return reducers;
+ } else if (reducersPowerTwo > maxReducers) {
+ // If the next power of two greater than the original number of reducers is greater
+ // than the max number of reducers, use the preceding power of two, which is strictly
+ // less than the original number of reducers and hence the max
+ reducers = reducersPowerTwo / 2;
+ } else {
+ // Otherwise use the smallest power of two greater than the original number of reducers
+ reducers = reducersPowerTwo;
+ }
+ }
+
+ return reducers;
+ }
+
+ /**
+ * Computes the total input file size. If block sampling was used it will scale this
+ * value by the highest sample percentage (as an estimate for input).
+ *
+ * @param inputSummary
+ * @param work
+ * @param highestSamplePercentage
+ * @return estimated total input size for job
+ */
+ public static long getTotalInputFileSize (ContentSummary inputSummary, MapWork work,
+ double highestSamplePercentage) {
+ long totalInputFileSize = inputSummary.getLength();
+ if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ // If percentage block sampling wasn't used, we don't need to do any estimation
+ return totalInputFileSize;
+ }
+
+ if (highestSamplePercentage >= 0) {
+ totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
+ , totalInputFileSize);
+ }
+ return totalInputFileSize;
+ }
+
+ /**
+ * Computes the total number of input files. If block sampling was used it will scale this
+ * value by the highest sample percentage (as an estimate for # input files).
+ *
+ * @param inputSummary
+ * @param work
+ * @param highestSamplePercentage
+ * @return
+ */
+ public static long getTotalInputNumFiles (ContentSummary inputSummary, MapWork work,
+ double highestSamplePercentage) {
+ long totalInputNumFiles = inputSummary.getFileCount();
+ if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ // If percentage block sampling wasn't used, we don't need to do any estimation
+ return totalInputNumFiles;
+ }
+
+ if (highestSamplePercentage >= 0) {
+ totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
+ , totalInputNumFiles);
+ }
+ return totalInputNumFiles;
+ }
+
+ /**
+ * Returns the highest sample percentage of any alias in the given MapWork
+ */
+ public static double getHighestSamplePercentage (MapWork work) {
+ double highestSamplePercentage = 0;
+ for (String alias : work.getAliasToWork().keySet()) {
+ if (work.getNameToSplitSample().containsKey(alias)) {
+ Double rate = work.getNameToSplitSample().get(alias).getPercent();
+ if (rate != null && rate > highestSamplePercentage) {
+ highestSamplePercentage = rate;
+ }
+ } else {
+ highestSamplePercentage = -1;
+ break;
+ }
+ }
+
+ return highestSamplePercentage;
+ }
+
+ /**
+ * Computes a list of all input paths needed to compute the given MapWork. All aliases
+ * are considered and a merged list of input paths is returned. If any input path points
+ * to an empty table or partition a dummy file in the scratch dir is instead created and
+ * added to the list. This is needed to avoid special casing the operator pipeline for
+ * these cases.
+ *
+ * @param job JobConf used to run the job
+ * @param work MapWork encapsulating the info about the task
+ * @param hiveScratchDir The tmp dir used to create dummy files if needed
+ * @param ctx Context object
+ * @return List of paths to process for the given MapWork
+ * @throws Exception
+ */
+ public static List<Path> getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
+ throws Exception {
+ int sequenceNumber = 0;
+
+ Set<Path> pathsProcessed = new HashSet<Path>();
+ List<Path> pathsToAdd = new LinkedList<Path>();
+ // AliasToWork contains all the aliases
+ for (String alias : work.getAliasToWork().keySet()) {
+ LOG.info("Processing alias " + alias);
+
+ // The alias may not have any path
+ Path path = null;
+ for (String file : new LinkedList<String>(work.getPathToAliases().keySet())) {
+ List<String> aliases = work.getPathToAliases().get(file);
+ if (aliases.contains(alias)) {
+ path = new Path(file);
+
+ // Multiple aliases can point to the same path - it should be
+ // processed only once
+ if (pathsProcessed.contains(path)) {
+ continue;
+ }
+
+ pathsProcessed.add(path);
+
+ LOG.info("Adding input file " + path);
+ if (isEmptyPath(job, path, ctx)) {
+ path = createDummyFileForEmptyPartition(path, job, work,
+ hiveScratchDir, alias, sequenceNumber++);
+
+ }
+ pathsToAdd.add(path);
+ }
+ }
+
+ // If the query references non-existent partitions
+ // We need to add a empty file, it is not acceptable to change the
+ // operator tree
+ // Consider the query:
+ // select * from (select count(1) from T union all select count(1) from
+ // T2) x;
+ // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
+ // rows)
+ if (path == null) {
+ path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
+ alias, sequenceNumber++);
+ pathsToAdd.add(path);
+ }
+ }
+ return pathsToAdd;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static Path createEmptyFile(String hiveScratchDir,
+ Class<? extends HiveOutputFormat> outFileFormat, JobConf job,
+ int sequenceNumber, Properties props, boolean dummyRow)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ // create a dummy empty file in a new directory
+ String newDir = hiveScratchDir + File.separator + sequenceNumber;
+ Path newPath = new Path(newDir);
+ FileSystem fs = newPath.getFileSystem(job);
+ fs.mkdirs(newPath);
+ //Qualify the path against the file system. The user configured path might contain default port which is skipped
+ //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
+ //file path.
+ newPath = fs.makeQualified(newPath);
+ String newFile = newDir + File.separator + "emptyFile";
+ Path newFilePath = new Path(newFile);
+
+ String onefile = newPath.toString();
+ RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+ Text.class, false, props, null);
+ if (dummyRow) {
+ // empty files are omitted at CombineHiveInputFormat.
+ // for meta-data only query, it effectively makes partition columns disappear..
+ // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
+ recWriter.write(new Text("empty")); // written via HiveIgnoreKeyTextOutputFormat
+ }
+ recWriter.close(false);
+
+ return newPath;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
+ String hiveScratchDir, String alias, int sequenceNumber)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ String strPath = path.toString();
+
+ // The input file does not exist, replace it by a empty file
+ PartitionDesc partDesc = work.getPathToPartitionInfo().get(strPath);
+ boolean nonNative = partDesc.getTableDesc().isNonNative();
+ boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
+ Properties props = partDesc.getProperties();
+ Class<? extends HiveOutputFormat> outFileFormat = partDesc.getOutputFileFormatClass();
+
+ if (nonNative) {
+ // if this isn't a hive table we can't create an empty file for it.
+ return path;
+ }
+
+ Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+ sequenceNumber, props, oneRow);
+
+
+ LOG.info("Changed input file to " + newPath);
+
+ // update the work
+ String strNewPath = newPath.toString();
+
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ pathToAliases.put(strNewPath, pathToAliases.get(strPath));
+ pathToAliases.remove(strPath);
+
+ work.setPathToAliases(pathToAliases);
+
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+ pathToPartitionInfo.put(strNewPath, pathToPartitionInfo.get(strPath));
+ pathToPartitionInfo.remove(strPath);
+ work.setPathToPartitionInfo(pathToPartitionInfo);
+
+ return newPath;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Path createDummyFileForEmptyTable(JobConf job, MapWork work,
+ String hiveScratchDir, String alias, int sequenceNumber)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
+ Properties props = tableDesc.getProperties();
+ boolean nonNative = tableDesc.isNonNative();
+ Class<? extends HiveOutputFormat> outFileFormat = tableDesc.getOutputFileFormatClass();
+
+ if (nonNative) {
+ // if this isn't a hive table we can't create an empty file for it.
+ return null;
+ }
+
+ Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+ sequenceNumber, props, false);
+
+
+ LOG.info("Changed input file to " + newPath.toString());
+
+ // update the work
+
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ ArrayList<String> newList = new ArrayList<String>();
+ newList.add(alias);
+ pathToAliases.put(newPath.toUri().toString(), newList);
+
+ work.setPathToAliases(pathToAliases);
+
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+ PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+ pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
+ work.setPathToPartitionInfo(pathToPartitionInfo);
+
+ return newPath;
+ }
+
+ /**
+ * setInputPaths add all the paths in the provided list to the Job conf object
+ * as input paths for the job.
+ *
+ * @param job
+ * @param pathsToAdd
+ */
+ public static void setInputPaths(JobConf job, List<Path> pathsToAdd) {
+
+ Path[] addedPaths = FileInputFormat.getInputPaths(job);
+ if (addedPaths == null) {
+ addedPaths = new Path[0];
+ }
+
+ Path[] combined = new Path[addedPaths.length + pathsToAdd.size()];
+ System.arraycopy(addedPaths, 0, combined, 0, addedPaths.length);
+
+ int i = 0;
+ for(Path p: pathsToAdd) {
+ combined[addedPaths.length + (i++)] = p;
+ }
+ FileInputFormat.setInputPaths(job, combined);
+ }
+
+ /**
+ * Set hive input format, and input format file if necessary.
+ */
+ public static void setInputAttributes(Configuration conf, MapWork mWork) {
+ if (mWork.getInputformat() != null) {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+ }
+ if (mWork.getIndexIntermediateFile() != null) {
+ conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+ conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
+ }
+
+ // Intentionally overwrites anything the user may have put here
+ conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
+ }
+
+ /**
+ * Hive uses tmp directories to capture the output of each FileSinkOperator.
+ * This method creates all necessary tmp directories for FileSinks in the Mapwork.
+ *
+ * @param conf Used to get the right FileSystem
+ * @param mWork Used to find FileSinkOperators
+ * @throws IOException
+ */
+ public static void createTmpDirs(Configuration conf, MapWork mWork)
+ throws IOException {
+
+ Map<String, ArrayList<String>> pa = mWork.getPathToAliases();
+ if (pa != null) {
+ List<Operator<? extends OperatorDesc>> ops =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (List<String> ls : pa.values()) {
+ for (String a : ls) {
+ ops.add(mWork.getAliasToWork().get(a));
+ }
+ }
+ createTmpDirs(conf, ops);
+ }
+ }
+
+ /**
+ * Hive uses tmp directories to capture the output of each FileSinkOperator.
+ * This method creates all necessary tmp directories for FileSinks in the ReduceWork.
+ *
+ * @param conf Used to get the right FileSystem
+ * @param rWork Used to find FileSinkOperators
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static void createTmpDirs(Configuration conf, ReduceWork rWork)
+ throws IOException {
+ if (rWork == null) {
+ return;
+ }
+ List<Operator<? extends OperatorDesc>> ops
+ = new LinkedList<Operator<? extends OperatorDesc>>();
+ ops.add(rWork.getReducer());
+ createTmpDirs(conf, ops);
+ }
+
+ private static void createTmpDirs(Configuration conf,
+ List<Operator<? extends OperatorDesc>> ops) throws IOException {
+
+ while (!ops.isEmpty()) {
+ Operator<? extends OperatorDesc> op = ops.remove(0);
+
+ if (op instanceof FileSinkOperator) {
+ FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+ String tempDir = fdesc.getDirName();
+
+ if (tempDir != null) {
+ Path tempPath = Utilities.toTempPath(new Path(tempDir));
+ FileSystem fs = tempPath.getFileSystem(conf);
+ fs.mkdirs(tempPath);
+ }
+ }
+
+ if (op.getChildOperators() != null) {
+ ops.addAll(op.getChildOperators());
+ }
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Aug 1 05:34:55 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.mr;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -29,13 +28,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -55,8 +49,6 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -66,20 +58,16 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
-import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -90,7 +78,6 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -211,49 +198,10 @@ public class ExecDriver extends Task<Map
return false;
}
- protected void createTmpDirs() throws IOException {
- // fix up outputs
- Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
- if (pa != null) {
- List<Operator<? extends OperatorDesc>> opList =
- new ArrayList<Operator<? extends OperatorDesc>>();
-
- if (work.getReduceWork() != null) {
- opList.add(work.getReduceWork().getReducer());
- }
-
- for (List<String> ls : pa.values()) {
- for (String a : ls) {
- opList.add(work.getMapWork().getAliasToWork().get(a));
-
- while (!opList.isEmpty()) {
- Operator<? extends OperatorDesc> op = opList.remove(0);
-
- if (op instanceof FileSinkOperator) {
- FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
- String tempDir = fdesc.getDirName();
-
- if (tempDir != null) {
- Path tempPath = Utilities.toTempPath(new Path(tempDir));
- LOG.info("Making Temp Directory: " + tempDir);
- FileSystem fs = tempPath.getFileSystem(job);
- fs.mkdirs(tempPath);
- }
- }
-
- if (op.getChildOperators() != null) {
- opList.addAll(op.getChildOperators());
- }
- }
- }
- }
- }
- }
-
/**
* Execute a query plan using Hadoop.
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "unchecked"})
@Override
public int execute(DriverContext driverContext) {
@@ -430,7 +378,8 @@ public class ExecDriver extends Task<Map
}
}
work.configureJobConf(job);
- addInputPaths(job, mWork, emptyScratchDirStr, ctx);
+ List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx);
+ Utilities.setInputPaths(job, inputPaths);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
@@ -470,7 +419,8 @@ public class ExecDriver extends Task<Map
}
}
- this.createTmpDirs();
+ Utilities.createTmpDirs(job, mWork);
+ Utilities.createTmpDirs(job, rWork);
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
@@ -658,6 +608,7 @@ public class ExecDriver extends Task<Map
}
}
+ @SuppressWarnings("unchecked")
public static void main(String[] args) throws IOException, HiveException {
String planFileName = null;
@@ -834,164 +785,6 @@ public class ExecDriver extends Task<Map
return w.getReduceWork() != null;
}
- /**
- * Handle a empty/null path for a given alias.
- */
- private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
- int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
- // either the directory does not exist or it is empty
- assert path == null || isEmptyPath;
-
- // The input file does not exist, replace it by a empty file
- Class<? extends HiveOutputFormat> outFileFormat = null;
- boolean nonNative = true;
- boolean oneRow = false;
- Properties props;
- if (isEmptyPath) {
- PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
- props = partDesc.getProperties();
- outFileFormat = partDesc.getOutputFileFormatClass();
- nonNative = partDesc.getTableDesc().isNonNative();
- oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
- } else {
- TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
- props = tableDesc.getProperties();
- outFileFormat = tableDesc.getOutputFileFormatClass();
- nonNative = tableDesc.isNonNative();
- }
-
- if (nonNative) {
- FileInputFormat.addInputPaths(job, path);
- LOG.info("Add a non-native table " + path);
- return numEmptyPaths;
- }
-
- // create a dummy empty file in a new directory
- String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
- Path newPath = new Path(newDir);
- FileSystem fs = newPath.getFileSystem(job);
- fs.mkdirs(newPath);
- //Qualify the path against the filesystem. The user configured path might contain default port which is skipped
- //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
- //filepath.
- newPath = fs.makeQualified(newPath);
- String newFile = newDir + File.separator + "emptyFile";
- Path newFilePath = new Path(newFile);
-
- LOG.info("Changed input file to " + newPath.toString());
-
- // toggle the work
-
- LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
-
- if (isEmptyPath) {
- assert path != null;
- pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
- pathToAliases.remove(path);
- } else {
- assert path == null;
- ArrayList<String> newList = new ArrayList<String>();
- newList.add(alias);
- pathToAliases.put(newPath.toUri().toString(), newList);
- }
-
- work.setPathToAliases(pathToAliases);
-
- LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
- if (isEmptyPath) {
- pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
- pathToPartitionInfo.remove(path);
- } else {
- PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
- pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
- }
- work.setPathToPartitionInfo(pathToPartitionInfo);
-
- String onefile = newPath.toString();
- RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
- Text.class, false, props, null);
- if (oneRow) {
- // empty files are ommited at CombineHiveInputFormat.
- // for metadata only query, it effectively makes partition columns disappear..
- // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
- recWriter.write(new Text("empty")); // written via HiveIgnoreKeyTextOutputFormat
- }
- recWriter.close(false);
- FileInputFormat.addInputPaths(job, onefile);
- return numEmptyPaths;
- }
-
- public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
- throws Exception {
- int numEmptyPaths = 0;
-
- Set<String> pathsProcessed = new HashSet<String>();
- List<String> pathsToAdd = new LinkedList<String>();
- // AliasToWork contains all the aliases
- for (String oneAlias : work.getAliasToWork().keySet()) {
- LOG.info("Processing alias " + oneAlias);
- List<String> emptyPaths = new ArrayList<String>();
-
- // The alias may not have any path
- String path = null;
- for (String onefile : work.getPathToAliases().keySet()) {
- List<String> aliases = work.getPathToAliases().get(onefile);
- if (aliases.contains(oneAlias)) {
- path = onefile;
-
- // Multiple aliases can point to the same path - it should be
- // processed only once
- if (pathsProcessed.contains(path)) {
- continue;
- }
-
- pathsProcessed.add(path);
-
- LOG.info("Adding input file " + path);
- if (Utilities.isEmptyPath(job, path, ctx)) {
- emptyPaths.add(path);
- } else {
- pathsToAdd.add(path);
- }
- }
- }
-
- // Create a empty file if the directory is empty
- for (String emptyPath : emptyPaths) {
- numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true,
- oneAlias);
- }
-
- // If the query references non-existent partitions
- // We need to add a empty file, it is not acceptable to change the
- // operator tree
- // Consider the query:
- // select * from (select count(1) from T union all select count(1) from
- // T2) x;
- // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
- // rows)
- if (path == null) {
- numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false,
- oneAlias);
- }
- }
- setInputPaths(job, pathsToAdd);
- }
-
- private static void setInputPaths(JobConf job, List<String> pathsToAdd) {
- Path[] addedPaths = FileInputFormat.getInputPaths(job);
- List<Path> toAddPathList = new ArrayList<Path>();
- if(addedPaths != null) {
- for(Path added: addedPaths) {
- toAddPathList.add(added);
- }
- }
- for(String toAdd: pathsToAdd) {
- toAddPathList.add(new Path(toAdd));
- }
- FileInputFormat.setInputPaths(job, toAddPathList.toArray(new Path[0]));
- }
-
@Override
public StageType getType() {
return StageType.MAPRED;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Thu Aug 1 05:34:55 2013
@@ -108,7 +108,9 @@ public class MapRedTask extends ExecDriv
// set the values of totalInputFileSize and totalInputNumFiles, estimating them
// if percentage block sampling is being used
- estimateInputSize();
+ double samplePercentage = Utilities.getHighestSamplePercentage(work.getMapWork());
+ totalInputFileSize = Utilities.getTotalInputFileSize(inputSummary, work.getMapWork(), samplePercentage);
+ totalInputNumFiles = Utilities.getTotalInputNumFiles(inputSummary, work.getMapWork(), samplePercentage);
// at this point the number of reducers is precisely defined in the plan
int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
@@ -403,7 +405,11 @@ public class MapRedTask extends ExecDriv
.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ reducers);
} else {
- int reducers = estimateNumberOfReducers();
+ if (inputSummary == null) {
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
+ }
+ int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+ work.isFinalMapRed());
rWork.setNumReduceTasks(reducers);
console
.printInfo("Number of reduce tasks not specified. Estimated from input data size: "
@@ -424,123 +430,6 @@ public class MapRedTask extends ExecDriv
}
/**
- * Estimate the number of reducers needed for this job, based on job input,
- * and configuration parameters.
- *
- * The output of this method should only be used if the output of this
- * MapRedTask is not being used to populate a bucketed table and the user
- * has not specified the number of reducers to use.
- *
- * @return the number of reducers.
- */
- private int estimateNumberOfReducers() throws IOException {
- long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
- int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-
- if(inputSummary == null) {
- // compute the summary and stash it away
- inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
- }
-
- // if all inputs are sampled, we should shrink the size of reducers accordingly.
- estimateInputSize();
-
- if (totalInputFileSize != inputSummary.getLength()) {
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
- } else {
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " totalInputFileSize=" + totalInputFileSize);
- }
-
- int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
-
- // If this map reduce job writes final data to a table and bucketing is being inferred,
- // and the user has configured Hive to do this, make sure the number of reducers is a
- // power of two
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
- work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
-
- int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
- int reducersPowerTwo = (int)Math.pow(2, reducersLog);
-
- // If the original number of reducers was a power of two, use that
- if (reducersPowerTwo / 2 == reducers) {
- return reducers;
- } else if (reducersPowerTwo > maxReducers) {
- // If the next power of two greater than the original number of reducers is greater
- // than the max number of reducers, use the preceding power of two, which is strictly
- // less than the original number of reducers and hence the max
- reducers = reducersPowerTwo / 2;
- } else {
- // Otherwise use the smallest power of two greater than the original number of reducers
- reducers = reducersPowerTwo;
- }
- }
-
- return reducers;
- }
-
- /**
- * Sets the values of totalInputFileSize and totalInputNumFiles. If percentage
- * block sampling is used, these values are estimates based on the highest
- * percentage being used for sampling multiplied by the value obtained from the
- * input summary. Otherwise, these values are set to the exact value obtained
- * from the input summary.
- *
- * Once the function completes, inputSizeEstimated is set so that the logic is
- * never run more than once.
- */
- private void estimateInputSize() {
- if (inputSizeEstimated) {
- // If we've already run this function, return
- return;
- }
-
- MapWork mWork = work.getMapWork();
-
- // Initialize the values to be those taken from the input summary
- totalInputFileSize = inputSummary.getLength();
- totalInputNumFiles = inputSummary.getFileCount();
-
- if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
- // If percentage block sampling wasn't used, we don't need to do any estimation
- inputSizeEstimated = true;
- return;
- }
-
- // if all inputs are sampled, we should shrink the size of the input accordingly
- double highestSamplePercentage = 0;
- boolean allSample = false;
- for (String alias : mWork.getAliasToWork().keySet()) {
- if (mWork.getNameToSplitSample().containsKey(alias)) {
- allSample = true;
- Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
- if (rate != null && rate > highestSamplePercentage) {
- highestSamplePercentage = rate;
- }
- } else {
- allSample = false;
- break;
- }
- }
- if (allSample) {
- // This is a little bit dangerous if inputs turns out not to be able to be sampled.
- // In that case, we significantly underestimate the input.
- // It's the same as estimateNumberOfReducers(). It's just our best
- // guess and there is no guarantee.
- totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
- , totalInputFileSize);
- totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
- , totalInputNumFiles);
- }
-
- inputSizeEstimated = true;
- }
-
- /**
* Find out if a job can be run in local mode based on it's characteristics
*
* @param conf Hive Configuration
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Aug 1 05:34:55 2013
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
try {
addInputPaths(job, work);
- Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
+ Utilities.setMapWork(job, work, ctx.getMRTmpFileURI(), true);
// remove the pwd from conf file so that job tracker doesn't show this
// logs
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java Thu Aug 1 05:34:55 2013
@@ -167,7 +167,9 @@ public class TestSymlinkTextInputFormat
QueryPlan plan = drv.getPlan();
MapRedTask selectTask = (MapRedTask)plan.getRootTasks().get(0);
- ExecDriver.addInputPaths(newJob, selectTask.getWork().getMapWork(), emptyScratchDir.toString(), ctx);
+ List<Path> inputPaths = Utilities.getInputPaths(newJob, selectTask.getWork().getMapWork(), emptyScratchDir.toString(), ctx);
+ Utilities.setInputPaths(newJob, inputPaths);
+
Utilities.setMapRedWork(newJob, selectTask.getWork(), ctx.getMRTmpFileURI());
CombineHiveInputFormat combineInputFormat = ReflectionUtils.newInstance(