You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/08/16 03:22:02 UTC
svn commit: r1514554 [5/18] - in /hive/branches/vectorization: ./
beeline/src/java/org/apache/hive/beeline/
cli/src/java/org/apache/hadoop/hive/cli/
cli/src/test/org/apache/hadoop/hive/cli/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Aug 16 01:21:54 2013
@@ -63,6 +63,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -108,6 +109,7 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -124,6 +126,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
@@ -131,12 +134,10 @@ import org.apache.hadoop.hive.ql.plan.Re
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
import org.apache.hadoop.hive.ql.plan.api.Graph;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
@@ -150,9 +151,11 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -429,6 +432,7 @@ public final class Utilities {
return new Expression(dateVal, dateVal.getClass(), "new", args);
}
+ @Override
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
if (oldInstance == null || newInstance == null) {
return false;
@@ -442,6 +446,7 @@ public final class Utilities {
* it is not serialization friendly.
*/
public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+ @Override
protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
Timestamp ts = (Timestamp)oldInstance;
Object[] args = { ts.getNanos() };
@@ -451,21 +456,21 @@ public final class Utilities {
}
public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
- setMapWork(conf, w.getMapWork(), hiveScratchDir);
+ setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
if (w.getReduceWork() != null) {
- setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+ setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
}
}
- public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
- setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+ public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) {
+ return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache);
}
- public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
- setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+ public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) {
+ return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache);
}
- private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
+ private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) {
try {
setPlanPath(conf, hiveScratchDir);
@@ -479,7 +484,7 @@ public final class Utilities {
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
// able to get the plan directly from the cache
- if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
+ if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
// Set up distributed cache
if (!DistributedCache.getSymlink(conf)) {
DistributedCache.createSymlink(conf);
@@ -495,6 +500,8 @@ public final class Utilities {
// Cache the plan in this process
gWorkMap.put(planPath, w);
+
+ return planPath;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -609,6 +616,7 @@ public final class Utilities {
* De-serialize an object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
+ @SuppressWarnings("unchecked")
public static <T> T deserializeObject(InputStream in) {
XMLDecoder d = null;
try {
@@ -1778,7 +1786,7 @@ public final class Utilities {
}
}
- public static Object getInputSummaryLock = new Object();
+ public static Object INPUT_SUMMARY_LOCK = new Object();
/**
* Calculate the total size of input files.
@@ -1801,7 +1809,7 @@ public final class Utilities {
// Since multiple threads could call this method concurrently, locking
// this method will avoid number of threads out of control.
- synchronized (getInputSummaryLock) {
+ synchronized (INPUT_SUMMARY_LOCK) {
// For each input path, calculate the total size.
for (String path : work.getPathToAliases().keySet()) {
Path p = new Path(path);
@@ -1912,7 +1920,7 @@ public final class Utilities {
throw new IOException(e);
}
} while (!executorDone);
- }
+ }
executor.shutdown();
}
HiveInterruptUtils.checkInterrupted();
@@ -1936,7 +1944,7 @@ public final class Utilities {
}
}
- public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
+ public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx)
throws Exception {
ContentSummary cs = ctx.getCS(dirPath);
if (cs != null) {
@@ -1946,8 +1954,7 @@ public final class Utilities {
} else {
LOG.info("Content Summary not cached for " + dirPath);
}
- Path p = new Path(dirPath);
- return isEmptyPath(job, p);
+ return isEmptyPath(job, dirPath);
}
public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception {
@@ -2532,5 +2539,415 @@ public final class Utilities {
return sb.toString();
}
+
+ /**
+ * Estimate the number of reducers needed for this job, based on job input,
+ * and configuration parameters.
+ *
+ * The output of this method should only be used if the output of this
+ * MapRedTask is not being used to populate a bucketed table and the user
+ * has not specified the number of reducers to use.
+ *
+ * @return the number of reducers.
+ */
+ public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,
+ MapWork work, boolean finalMapRed) throws IOException {
+ long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+ int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+ double samplePercentage = getHighestSamplePercentage(work);
+ long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);
+
+ // if all inputs are sampled, we should shrink the size of reducers accordingly.
+ if (totalInputFileSize != inputSummary.getLength()) {
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
+ } else {
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+ + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+ }
+
+ int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+ reducers = Math.max(1, reducers);
+ reducers = Math.min(maxReducers, reducers);
+
+ // If this map reduce job writes final data to a table and bucketing is being inferred,
+ // and the user has configured Hive to do this, make sure the number of reducers is a
+ // power of two
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
+ finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) {
+
+ int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
+ int reducersPowerTwo = (int)Math.pow(2, reducersLog);
+
+ // If the original number of reducers was a power of two, use that
+ if (reducersPowerTwo / 2 == reducers) {
+ return reducers;
+ } else if (reducersPowerTwo > maxReducers) {
+ // If the next power of two greater than the original number of reducers is greater
+ // than the max number of reducers, use the preceding power of two, which is strictly
+ // less than the original number of reducers and hence the max
+ reducers = reducersPowerTwo / 2;
+ } else {
+ // Otherwise use the smallest power of two greater than the original number of reducers
+ reducers = reducersPowerTwo;
+ }
+ }
+
+ return reducers;
+ }
+
+ /**
+ * Computes the total input file size. If block sampling was used it will scale this
+ * value by the highest sample percentage (as an estimate for input).
+ *
+ * @param inputSummary
+ * @param work
+ * @param highestSamplePercentage
+ * @return estimated total input size for job
+ */
+ public static long getTotalInputFileSize (ContentSummary inputSummary, MapWork work,
+ double highestSamplePercentage) {
+ long totalInputFileSize = inputSummary.getLength();
+ if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ // If percentage block sampling wasn't used, we don't need to do any estimation
+ return totalInputFileSize;
+ }
+
+ if (highestSamplePercentage >= 0) {
+ totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
+ , totalInputFileSize);
+ }
+ return totalInputFileSize;
+ }
+
+ /**
+ * Computes the total number of input files. If block sampling was used it will scale this
+ * value by the highest sample percentage (as an estimate for # input files).
+ *
+ * @param inputSummary
+ * @param work
+ * @param highestSamplePercentage
+ * @return
+ */
+ public static long getTotalInputNumFiles (ContentSummary inputSummary, MapWork work,
+ double highestSamplePercentage) {
+ long totalInputNumFiles = inputSummary.getFileCount();
+ if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ // If percentage block sampling wasn't used, we don't need to do any estimation
+ return totalInputNumFiles;
+ }
+
+ if (highestSamplePercentage >= 0) {
+ totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
+ , totalInputNumFiles);
+ }
+ return totalInputNumFiles;
+ }
+
+ /**
+ * Returns the highest sample percentage of any alias in the given MapWork
+ */
+ public static double getHighestSamplePercentage (MapWork work) {
+ double highestSamplePercentage = 0;
+ for (String alias : work.getAliasToWork().keySet()) {
+ if (work.getNameToSplitSample().containsKey(alias)) {
+ Double rate = work.getNameToSplitSample().get(alias).getPercent();
+ if (rate != null && rate > highestSamplePercentage) {
+ highestSamplePercentage = rate;
+ }
+ } else {
+ highestSamplePercentage = -1;
+ break;
+ }
+ }
+
+ return highestSamplePercentage;
+ }
+
+ /**
+ * Computes a list of all input paths needed to compute the given MapWork. All aliases
+ * are considered and a merged list of input paths is returned. If any input path points
+ * to an empty table or partition a dummy file in the scratch dir is instead created and
+ * added to the list. This is needed to avoid special casing the operator pipeline for
+ * these cases.
+ *
+ * @param job JobConf used to run the job
+ * @param work MapWork encapsulating the info about the task
+ * @param hiveScratchDir The tmp dir used to create dummy files if needed
+ * @param ctx Context object
+ * @return List of paths to process for the given MapWork
+ * @throws Exception
+ */
+ public static List<Path> getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
+ throws Exception {
+ int sequenceNumber = 0;
+
+ Set<Path> pathsProcessed = new HashSet<Path>();
+ List<Path> pathsToAdd = new LinkedList<Path>();
+ // AliasToWork contains all the aliases
+ for (String alias : work.getAliasToWork().keySet()) {
+ LOG.info("Processing alias " + alias);
+
+ // The alias may not have any path
+ Path path = null;
+ for (String file : new LinkedList<String>(work.getPathToAliases().keySet())) {
+ List<String> aliases = work.getPathToAliases().get(file);
+ if (aliases.contains(alias)) {
+ path = new Path(file);
+
+ // Multiple aliases can point to the same path - it should be
+ // processed only once
+ if (pathsProcessed.contains(path)) {
+ continue;
+ }
+
+ pathsProcessed.add(path);
+
+ LOG.info("Adding input file " + path);
+ if (isEmptyPath(job, path, ctx)) {
+ path = createDummyFileForEmptyPartition(path, job, work,
+ hiveScratchDir, alias, sequenceNumber++);
+
+ }
+ pathsToAdd.add(path);
+ }
+ }
+
+ // If the query references non-existent partitions
+ // We need to add a empty file, it is not acceptable to change the
+ // operator tree
+ // Consider the query:
+ // select * from (select count(1) from T union all select count(1) from
+ // T2) x;
+ // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
+ // rows)
+ if (path == null) {
+ path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
+ alias, sequenceNumber++);
+ pathsToAdd.add(path);
+ }
+ }
+ return pathsToAdd;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static Path createEmptyFile(String hiveScratchDir,
+ Class<? extends HiveOutputFormat> outFileFormat, JobConf job,
+ int sequenceNumber, Properties props, boolean dummyRow)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ // create a dummy empty file in a new directory
+ String newDir = hiveScratchDir + File.separator + sequenceNumber;
+ Path newPath = new Path(newDir);
+ FileSystem fs = newPath.getFileSystem(job);
+ fs.mkdirs(newPath);
+ //Qualify the path against the file system. The user configured path might contain default port which is skipped
+ //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
+ //file path.
+ newPath = fs.makeQualified(newPath);
+ String newFile = newDir + File.separator + "emptyFile";
+ Path newFilePath = new Path(newFile);
+
+ String onefile = newPath.toString();
+ RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+ Text.class, false, props, null);
+ if (dummyRow) {
+ // empty files are omitted at CombineHiveInputFormat.
+ // for meta-data only query, it effectively makes partition columns disappear..
+ // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
+ recWriter.write(new Text("empty")); // written via HiveIgnoreKeyTextOutputFormat
+ }
+ recWriter.close(false);
+
+ return newPath;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
+ String hiveScratchDir, String alias, int sequenceNumber)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ String strPath = path.toString();
+
+ // The input file does not exist, replace it by a empty file
+ PartitionDesc partDesc = work.getPathToPartitionInfo().get(strPath);
+ boolean nonNative = partDesc.getTableDesc().isNonNative();
+ boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
+ Properties props = partDesc.getProperties();
+ Class<? extends HiveOutputFormat> outFileFormat = partDesc.getOutputFileFormatClass();
+
+ if (nonNative) {
+ // if this isn't a hive table we can't create an empty file for it.
+ return path;
+ }
+
+ Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+ sequenceNumber, props, oneRow);
+
+
+ LOG.info("Changed input file to " + newPath);
+
+ // update the work
+ String strNewPath = newPath.toString();
+
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ pathToAliases.put(strNewPath, pathToAliases.get(strPath));
+ pathToAliases.remove(strPath);
+
+ work.setPathToAliases(pathToAliases);
+
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+ pathToPartitionInfo.put(strNewPath, pathToPartitionInfo.get(strPath));
+ pathToPartitionInfo.remove(strPath);
+ work.setPathToPartitionInfo(pathToPartitionInfo);
+
+ return newPath;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static Path createDummyFileForEmptyTable(JobConf job, MapWork work,
+ String hiveScratchDir, String alias, int sequenceNumber)
+ throws IOException, InstantiationException, IllegalAccessException {
+
+ TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
+ Properties props = tableDesc.getProperties();
+ boolean nonNative = tableDesc.isNonNative();
+ Class<? extends HiveOutputFormat> outFileFormat = tableDesc.getOutputFileFormatClass();
+
+ if (nonNative) {
+ // if this isn't a hive table we can't create an empty file for it.
+ return null;
+ }
+
+ Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+ sequenceNumber, props, false);
+
+
+ LOG.info("Changed input file to " + newPath.toString());
+
+ // update the work
+
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ ArrayList<String> newList = new ArrayList<String>();
+ newList.add(alias);
+ pathToAliases.put(newPath.toUri().toString(), newList);
+
+ work.setPathToAliases(pathToAliases);
+
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+ PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+ pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
+ work.setPathToPartitionInfo(pathToPartitionInfo);
+
+ return newPath;
+ }
+
+ /**
+ * setInputPaths add all the paths in the provided list to the Job conf object
+ * as input paths for the job.
+ *
+ * @param job
+ * @param pathsToAdd
+ */
+ public static void setInputPaths(JobConf job, List<Path> pathsToAdd) {
+
+ Path[] addedPaths = FileInputFormat.getInputPaths(job);
+ if (addedPaths == null) {
+ addedPaths = new Path[0];
+ }
+
+ Path[] combined = new Path[addedPaths.length + pathsToAdd.size()];
+ System.arraycopy(addedPaths, 0, combined, 0, addedPaths.length);
+
+ int i = 0;
+ for(Path p: pathsToAdd) {
+ combined[addedPaths.length + (i++)] = p;
+ }
+ FileInputFormat.setInputPaths(job, combined);
+ }
+
+ /**
+ * Set hive input format, and input format file if necessary.
+ */
+ public static void setInputAttributes(Configuration conf, MapWork mWork) {
+ if (mWork.getInputformat() != null) {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+ }
+ if (mWork.getIndexIntermediateFile() != null) {
+ conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+ conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
+ }
+
+ // Intentionally overwrites anything the user may have put here
+ conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
+ }
+
+ /**
+ * Hive uses tmp directories to capture the output of each FileSinkOperator.
+ * This method creates all necessary tmp directories for FileSinks in the Mapwork.
+ *
+ * @param conf Used to get the right FileSystem
+ * @param mWork Used to find FileSinkOperators
+ * @throws IOException
+ */
+ public static void createTmpDirs(Configuration conf, MapWork mWork)
+ throws IOException {
+
+ Map<String, ArrayList<String>> pa = mWork.getPathToAliases();
+ if (pa != null) {
+ List<Operator<? extends OperatorDesc>> ops =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (List<String> ls : pa.values()) {
+ for (String a : ls) {
+ ops.add(mWork.getAliasToWork().get(a));
+ }
+ }
+ createTmpDirs(conf, ops);
+ }
+ }
+
+ /**
+ * Hive uses tmp directories to capture the output of each FileSinkOperator.
+ * This method creates all necessary tmp directories for FileSinks in the ReduceWork.
+ *
+ * @param conf Used to get the right FileSystem
+ * @param rWork Used to find FileSinkOperators
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static void createTmpDirs(Configuration conf, ReduceWork rWork)
+ throws IOException {
+ if (rWork == null) {
+ return;
+ }
+ List<Operator<? extends OperatorDesc>> ops
+ = new LinkedList<Operator<? extends OperatorDesc>>();
+ ops.add(rWork.getReducer());
+ createTmpDirs(conf, ops);
+ }
+
+ private static void createTmpDirs(Configuration conf,
+ List<Operator<? extends OperatorDesc>> ops) throws IOException {
+
+ while (!ops.isEmpty()) {
+ Operator<? extends OperatorDesc> op = ops.remove(0);
+
+ if (op instanceof FileSinkOperator) {
+ FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+ String tempDir = fdesc.getDirName();
+
+ if (tempDir != null) {
+ Path tempPath = Utilities.toTempPath(new Path(tempDir));
+ FileSystem fs = tempPath.getFileSystem(conf);
+ fs.mkdirs(tempPath);
+ }
+ }
+
+ if (op.getChildOperators() != null) {
+ ops.addAll(op.getChildOperators());
+ }
+ }
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionDescription.java Fri Aug 16 01:21:54 2013
@@ -50,5 +50,17 @@ public @interface WindowFunctionDescript
* for all the rows.
*/
boolean pivotResult() default false;
+
+ /**
+ * Used in translations process to validate arguments
+ * @return true if ranking function
+ */
+ boolean rankingFunction() default false;
+
+ /**
+ * Using in analytical functions to specify that UDF implies an ordering
+ * @return true if the function implies order
+ */
+ boolean impliesOrder() default false;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/WindowFunctionInfo.java Fri Aug 16 01:21:54 2013
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import org.apache.hadoop.hive.ql.exec.FunctionInfo;
-import org.apache.hadoop.hive.ql.exec.WindowFunctionDescription;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
@SuppressWarnings("deprecation")
@@ -27,6 +25,7 @@ public class WindowFunctionInfo
{
boolean supportsWindow = true;
boolean pivotResult = false;
+ boolean impliesOrder = false;
FunctionInfo fInfo;
WindowFunctionInfo(FunctionInfo fInfo)
@@ -39,6 +38,7 @@ public class WindowFunctionInfo
{
supportsWindow = def.supportsWindow();
pivotResult = def.pivotResult();
+ impliesOrder = def.impliesOrder();
}
}
@@ -52,6 +52,9 @@ public class WindowFunctionInfo
return pivotResult;
}
+ public boolean isImpliesOrder(){
+ return impliesOrder;
+ }
public FunctionInfo getfInfo()
{
return fInfo;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Fri Aug 16 01:21:54 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.mr;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,13 +29,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -56,8 +50,6 @@ import org.apache.hadoop.hive.ql.DriverC
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -71,20 +63,16 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
-import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -95,13 +83,11 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.log4j.Appender;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.FileAppender;
@@ -216,49 +202,10 @@ public class ExecDriver extends Task<Map
return false;
}
- protected void createTmpDirs() throws IOException {
- // fix up outputs
- Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
- if (pa != null) {
- List<Operator<? extends OperatorDesc>> opList =
- new ArrayList<Operator<? extends OperatorDesc>>();
-
- if (work.getReduceWork() != null) {
- opList.add(work.getReduceWork().getReducer());
- }
-
- for (List<String> ls : pa.values()) {
- for (String a : ls) {
- opList.add(work.getMapWork().getAliasToWork().get(a));
-
- while (!opList.isEmpty()) {
- Operator<? extends OperatorDesc> op = opList.remove(0);
-
- if (op instanceof FileSinkOperator) {
- FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
- String tempDir = fdesc.getDirName();
-
- if (tempDir != null) {
- Path tempPath = Utilities.toTempPath(new Path(tempDir));
- LOG.info("Making Temp Directory: " + tempDir);
- FileSystem fs = tempPath.getFileSystem(job);
- fs.mkdirs(tempPath);
- }
- }
-
- if (op.getChildOperators() != null) {
- opList.addAll(op.getChildOperators());
- }
- }
- }
- }
- }
- }
-
/**
* Execute a query plan using Hadoop.
*/
- @SuppressWarnings("deprecation")
+ @SuppressWarnings({"deprecation", "unchecked"})
@Override
public int execute(DriverContext driverContext) {
@@ -451,7 +398,8 @@ public class ExecDriver extends Task<Map
}
}
work.configureJobConf(job);
- addInputPaths(job, mWork, emptyScratchDirStr, ctx);
+ List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx);
+ Utilities.setInputPaths(job, inputPaths);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
@@ -491,7 +439,8 @@ public class ExecDriver extends Task<Map
}
}
- this.createTmpDirs();
+ Utilities.createTmpDirs(job, mWork);
+ Utilities.createTmpDirs(job, rWork);
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
@@ -629,7 +578,7 @@ public class ExecDriver extends Task<Map
String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
Path partitionFile = new Path(tmpPath, ".partitions");
- TotalOrderPartitioner.setPartitionFile(job, partitionFile);
+ ShimLoader.getHadoopShims().setTotalOrderPartitionFile(job, partitionFile);
PartitionKeySampler sampler = new PartitionKeySampler();
@@ -732,6 +681,7 @@ public class ExecDriver extends Task<Map
}
}
+ @SuppressWarnings("unchecked")
public static void main(String[] args) throws IOException, HiveException {
String planFileName = null;
@@ -739,6 +689,7 @@ public class ExecDriver extends Task<Map
boolean noLog = false;
String files = null;
boolean localtask = false;
+ String hadoopAuthToken = null;
try {
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-plan")) {
@@ -751,6 +702,9 @@ public class ExecDriver extends Task<Map
files = args[++i];
} else if (args[i].equals("-localtask")) {
localtask = true;
+ } else if (args[i].equals("-hadooptoken")) {
+ //set with HS2 in secure mode with doAs
+ hadoopAuthToken = args[++i];
}
}
} catch (IndexOutOfBoundsException e) {
@@ -772,6 +726,9 @@ public class ExecDriver extends Task<Map
if (files != null) {
conf.set("tmpfiles", files);
}
+ if(hadoopAuthToken != null){
+ conf.set("mapreduce.job.credentials.binary", hadoopAuthToken);
+ }
boolean isSilent = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESESSIONSILENT);
@@ -908,164 +865,6 @@ public class ExecDriver extends Task<Map
return w.getReduceWork() != null;
}
- /**
- * Handle a empty/null path for a given alias.
- */
- private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
- int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
- // either the directory does not exist or it is empty
- assert path == null || isEmptyPath;
-
- // The input file does not exist, replace it by a empty file
- Class<? extends HiveOutputFormat> outFileFormat = null;
- boolean nonNative = true;
- boolean oneRow = false;
- Properties props;
- if (isEmptyPath) {
- PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
- props = partDesc.getProperties();
- outFileFormat = partDesc.getOutputFileFormatClass();
- nonNative = partDesc.getTableDesc().isNonNative();
- oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
- } else {
- TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
- props = tableDesc.getProperties();
- outFileFormat = tableDesc.getOutputFileFormatClass();
- nonNative = tableDesc.isNonNative();
- }
-
- if (nonNative) {
- FileInputFormat.addInputPaths(job, path);
- LOG.info("Add a non-native table " + path);
- return numEmptyPaths;
- }
-
- // create a dummy empty file in a new directory
- String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
- Path newPath = new Path(newDir);
- FileSystem fs = newPath.getFileSystem(job);
- fs.mkdirs(newPath);
- //Qualify the path against the filesystem. The user configured path might contain default port which is skipped
- //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
- //filepath.
- newPath = fs.makeQualified(newPath);
- String newFile = newDir + File.separator + "emptyFile";
- Path newFilePath = new Path(newFile);
-
- LOG.info("Changed input file to " + newPath.toString());
-
- // toggle the work
-
- LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
-
- if (isEmptyPath) {
- assert path != null;
- pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
- pathToAliases.remove(path);
- } else {
- assert path == null;
- ArrayList<String> newList = new ArrayList<String>();
- newList.add(alias);
- pathToAliases.put(newPath.toUri().toString(), newList);
- }
-
- work.setPathToAliases(pathToAliases);
-
- LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
- if (isEmptyPath) {
- pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
- pathToPartitionInfo.remove(path);
- } else {
- PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
- pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
- }
- work.setPathToPartitionInfo(pathToPartitionInfo);
-
- String onefile = newPath.toString();
- RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
- Text.class, false, props, null);
- if (oneRow) {
- // empty files are ommited at CombineHiveInputFormat.
- // for metadata only query, it effectively makes partition columns disappear..
- // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
- recWriter.write(new Text("empty")); // written via HiveIgnoreKeyTextOutputFormat
- }
- recWriter.close(false);
- FileInputFormat.addInputPaths(job, onefile);
- return numEmptyPaths;
- }
-
- public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
- throws Exception {
- int numEmptyPaths = 0;
-
- Set<String> pathsProcessed = new HashSet<String>();
- List<String> pathsToAdd = new LinkedList<String>();
- // AliasToWork contains all the aliases
- for (String oneAlias : work.getAliasToWork().keySet()) {
- LOG.info("Processing alias " + oneAlias);
- List<String> emptyPaths = new ArrayList<String>();
-
- // The alias may not have any path
- String path = null;
- for (String onefile : work.getPathToAliases().keySet()) {
- List<String> aliases = work.getPathToAliases().get(onefile);
- if (aliases.contains(oneAlias)) {
- path = onefile;
-
- // Multiple aliases can point to the same path - it should be
- // processed only once
- if (pathsProcessed.contains(path)) {
- continue;
- }
-
- pathsProcessed.add(path);
-
- LOG.info("Adding input file " + path);
- if (Utilities.isEmptyPath(job, path, ctx)) {
- emptyPaths.add(path);
- } else {
- pathsToAdd.add(path);
- }
- }
- }
-
- // Create a empty file if the directory is empty
- for (String emptyPath : emptyPaths) {
- numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true,
- oneAlias);
- }
-
- // If the query references non-existent partitions
- // We need to add a empty file, it is not acceptable to change the
- // operator tree
- // Consider the query:
- // select * from (select count(1) from T union all select count(1) from
- // T2) x;
- // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
- // rows)
- if (path == null) {
- numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false,
- oneAlias);
- }
- }
- setInputPaths(job, pathsToAdd);
- }
-
- private static void setInputPaths(JobConf job, List<String> pathsToAdd) {
- Path[] addedPaths = FileInputFormat.getInputPaths(job);
- List<Path> toAddPathList = new ArrayList<Path>();
- if(addedPaths != null) {
- for(Path added: addedPaths) {
- toAddPathList.add(added);
- }
- }
- for(String toAdd: pathsToAdd) {
- toAddPathList.add(new Path(toAdd));
- }
- FileInputFormat.setInputPaths(job, toAddPathList.toArray(new Path[0]));
- }
-
@Override
public StageType getType() {
return StageType.MAPRED;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Fri Aug 16 01:21:54 2013
@@ -108,7 +108,9 @@ public class MapRedTask extends ExecDriv
// set the values of totalInputFileSize and totalInputNumFiles, estimating them
// if percentage block sampling is being used
- estimateInputSize();
+ double samplePercentage = Utilities.getHighestSamplePercentage(work.getMapWork());
+ totalInputFileSize = Utilities.getTotalInputFileSize(inputSummary, work.getMapWork(), samplePercentage);
+ totalInputNumFiles = Utilities.getTotalInputNumFiles(inputSummary, work.getMapWork(), samplePercentage);
// at this point the number of reducers is precisely defined in the plan
int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
@@ -403,7 +405,11 @@ public class MapRedTask extends ExecDriv
.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ reducers);
} else {
- int reducers = estimateNumberOfReducers();
+ if (inputSummary == null) {
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
+ }
+ int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+ work.isFinalMapRed());
rWork.setNumReduceTasks(reducers);
console
.printInfo("Number of reduce tasks not specified. Estimated from input data size: "
@@ -424,123 +430,6 @@ public class MapRedTask extends ExecDriv
}
/**
- * Estimate the number of reducers needed for this job, based on job input,
- * and configuration parameters.
- *
- * The output of this method should only be used if the output of this
- * MapRedTask is not being used to populate a bucketed table and the user
- * has not specified the number of reducers to use.
- *
- * @return the number of reducers.
- */
- private int estimateNumberOfReducers() throws IOException {
- long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
- int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-
- if(inputSummary == null) {
- // compute the summary and stash it away
- inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
- }
-
- // if all inputs are sampled, we should shrink the size of reducers accordingly.
- estimateInputSize();
-
- if (totalInputFileSize != inputSummary.getLength()) {
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
- } else {
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
- + maxReducers + " totalInputFileSize=" + totalInputFileSize);
- }
-
- int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
-
- // If this map reduce job writes final data to a table and bucketing is being inferred,
- // and the user has configured Hive to do this, make sure the number of reducers is a
- // power of two
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
- work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
-
- int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
- int reducersPowerTwo = (int)Math.pow(2, reducersLog);
-
- // If the original number of reducers was a power of two, use that
- if (reducersPowerTwo / 2 == reducers) {
- return reducers;
- } else if (reducersPowerTwo > maxReducers) {
- // If the next power of two greater than the original number of reducers is greater
- // than the max number of reducers, use the preceding power of two, which is strictly
- // less than the original number of reducers and hence the max
- reducers = reducersPowerTwo / 2;
- } else {
- // Otherwise use the smallest power of two greater than the original number of reducers
- reducers = reducersPowerTwo;
- }
- }
-
- return reducers;
- }
-
- /**
- * Sets the values of totalInputFileSize and totalInputNumFiles. If percentage
- * block sampling is used, these values are estimates based on the highest
- * percentage being used for sampling multiplied by the value obtained from the
- * input summary. Otherwise, these values are set to the exact value obtained
- * from the input summary.
- *
- * Once the function completes, inputSizeEstimated is set so that the logic is
- * never run more than once.
- */
- private void estimateInputSize() {
- if (inputSizeEstimated) {
- // If we've already run this function, return
- return;
- }
-
- MapWork mWork = work.getMapWork();
-
- // Initialize the values to be those taken from the input summary
- totalInputFileSize = inputSummary.getLength();
- totalInputNumFiles = inputSummary.getFileCount();
-
- if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
- // If percentage block sampling wasn't used, we don't need to do any estimation
- inputSizeEstimated = true;
- return;
- }
-
- // if all inputs are sampled, we should shrink the size of the input accordingly
- double highestSamplePercentage = 0;
- boolean allSample = false;
- for (String alias : mWork.getAliasToWork().keySet()) {
- if (mWork.getNameToSplitSample().containsKey(alias)) {
- allSample = true;
- Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
- if (rate != null && rate > highestSamplePercentage) {
- highestSamplePercentage = rate;
- }
- } else {
- allSample = false;
- break;
- }
- }
- if (allSample) {
- // This is a little bit dangerous if inputs turns out not to be able to be sampled.
- // In that case, we significantly underestimate the input.
- // It's the same as estimateNumberOfReducers(). It's just our best
- // guess and there is no guarantee.
- totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
- , totalInputFileSize);
- totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
- , totalInputNumFiles);
- }
-
- inputSizeEstimated = true;
- }
-
- /**
* Find out if a job can be run in local mode based on it's characteristics
*
* @param conf Hive Configuration
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Fri Aug 16 01:21:54 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.exec.Bu
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SecureCmdDoAs;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -70,13 +71,12 @@ import org.apache.hadoop.hive.shims.Shim
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
-
/**
* MapredLocalTask represents any local work (i.e.: client side work) that hive needs to
* execute. E.g.: This is used for generating Hashtables for Mapjoins on the client
* before the Join is executed on the cluster.
- *
- * MapRedLocalTask does not actually execute the work in process, but rather generates
+ *
+ * MapRedLocalTask does not actually execute the work in process, but rather generates
* a command using ExecDriver. ExecDriver is what will finally drive processing the records.
*/
public class MapredLocalTask extends Task<MapredLocalWork> implements Serializable {
@@ -174,8 +174,6 @@ public class MapredLocalTask extends Tas
}
}
- LOG.info("Executing: " + cmdLine);
-
// Inherit Java system variables
String hadoopOpts;
StringBuilder sb = new StringBuilder();
@@ -231,14 +229,29 @@ public class MapredLocalTask extends Tas
MapRedTask.configureDebugVariablesForChildJVM(variables);
}
+
+ if(ShimLoader.getHadoopShims().isSecurityEnabled() &&
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS) == true
+ ){
+ //If kerberos security is enabled, and HS2 doAs is enabled,
+ // then additional params need to be set so that the command is run as
+ // intended user
+ SecureCmdDoAs secureDoAs = new SecureCmdDoAs(conf);
+ cmdLine = secureDoAs.addArg(cmdLine);
+ secureDoAs.addEnv(variables);
+ }
+
env = new String[variables.size()];
int pos = 0;
for (Map.Entry<String, String> entry : variables.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
env[pos++] = name + "=" + value;
+ LOG.debug("Setting env: " + env[pos-1]);
}
+ LOG.info("Executing: " + cmdLine);
+
// Run ExecDriver in another JVM
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Fri Aug 16 01:21:54 2013
@@ -34,9 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -44,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Jo
import org.apache.hadoop.hive.ql.exec.Stat;
import org.apache.hadoop.hive.ql.exec.TerminalOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -699,7 +698,7 @@ public class VectorFileSinkOperator exte
List<String> skewedCols = lbCtx.getSkewedColNames();
List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
List<String> skewedValsCandidate = null;
- Map<SkewedValueList, String> locationMap = lbCtx.getLbLocationMap();
+ Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
/* Convert input row to standard objects. */
ObjectInspectorUtils.copyToStandardObject(standObjs, row,
@@ -717,14 +716,14 @@ public class VectorFileSinkOperator exte
if (allSkewedVals.contains(skewedValsCandidate)) {
/* matches skewed values. */
lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
- locationMap.put(new SkewedValueList(skewedValsCandidate), lbDirName);
+ locationMap.put(skewedValsCandidate, lbDirName);
} else {
/* create default directory. */
lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
lbCtx.getDefaultDirName());
List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
if (!locationMap.containsKey(defaultKey)) {
- locationMap.put(new SkewedValueList(defaultKey), lbDirName);
+ locationMap.put(defaultKey, lbDirName);
}
}
return lbDirName;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistory.java Fri Aug 16 01:21:54 2013
@@ -18,55 +18,21 @@
package org.apache.hadoop.hive.ql.history;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.conf.HiveConf;
+
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.Counters.Group;
/**
- * HiveHistory.
- *
+ * HiveHistory. Logs information such as query, query plan, runtime statistics
+ * into a file.
+ * Each session uses a new object, which creates a new file.
*/
-public class HiveHistory {
-
- PrintWriter histStream; // History File stream
-
- String histFileName; // History file name
-
- private static final Log LOG = LogFactory.getLog("hive.ql.exec.HiveHistory");
-
- private LogHelper console;
-
- private Map<String, String> idToTableMap = null;
-
- // Job Hash Map
- private final HashMap<String, QueryInfo> queryInfoMap = new HashMap<String, QueryInfo>();
-
- // Task Hash Map
- private final HashMap<String, TaskInfo> taskInfoMap = new HashMap<String, TaskInfo>();
-
- private static final String DELIMITER = " ";
+public interface HiveHistory {
/**
* RecordTypes.
@@ -105,20 +71,11 @@ public class HiveHistory {
ROWS_INSERTED
};
- private static final String KEY = "(\\w+)";
- private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
- private static final String ROW_COUNT_PATTERN = "TABLE_ID_(\\d+)_ROWCOUNT";
-
- private static final Pattern pattern = Pattern.compile(KEY + "=" + "\""
- + VALUE + "\"");
-
- private static final Pattern rowCountPattern = Pattern.compile(ROW_COUNT_PATTERN);
-
- // temp buffer for parsed dataa
- private static Map<String, String> parseBuffer = new HashMap<String, String>();
-
/**
- * Listner interface Parser will call handle function for each record type.
+ * Listener interface.
+ * Parser will call handle function for each history record row, specifying
+ * the record type and its values
+ *
*/
public static interface Listener {
@@ -126,63 +83,6 @@ public class HiveHistory {
}
/**
- * Parses history file and calls call back functions.
- *
- * @param path
- * @param l
- * @throws IOException
- */
- public static void parseHiveHistory(String path, Listener l) throws IOException {
- FileInputStream fi = new FileInputStream(path);
- BufferedReader reader = new BufferedReader(new InputStreamReader(fi));
- try {
- String line = null;
- StringBuilder buf = new StringBuilder();
- while ((line = reader.readLine()) != null) {
- buf.append(line);
- // if it does not end with " then it is line continuation
- if (!line.trim().endsWith("\"")) {
- continue;
- }
- parseLine(buf.toString(), l);
- buf = new StringBuilder();
- }
- } finally {
- try {
- reader.close();
- } catch (IOException ex) {
- }
- }
- }
-
- /**
- * Parse a single line of history.
- *
- * @param line
- * @param l
- * @throws IOException
- */
- private static void parseLine(String line, Listener l) throws IOException {
- // extract the record type
- int idx = line.indexOf(' ');
- String recType = line.substring(0, idx);
- String data = line.substring(idx + 1, line.length());
-
- Matcher matcher = pattern.matcher(data);
-
- while (matcher.find()) {
- String tuple = matcher.group(0);
- String[] parts = tuple.split("=");
-
- parseBuffer.put(parts[0], parts[1].substring(1, parts[1].length() - 1));
- }
-
- l.handle(RecordTypes.valueOf(recType), parseBuffer);
-
- parseBuffer.clear();
- }
-
- /**
* Info.
*
*/
@@ -216,122 +116,25 @@ public class HiveHistory {
};
- /**
- * Construct HiveHistory object an open history log file.
- *
- * @param ss
- */
- public HiveHistory(SessionState ss) {
-
- try {
- console = new LogHelper(LOG);
- String conf_file_loc = ss.getConf().getVar(
- HiveConf.ConfVars.HIVEHISTORYFILELOC);
- if ((conf_file_loc == null) || conf_file_loc.length() == 0) {
- console.printError("No history file location given");
- return;
- }
-
- // Create directory
- File f = new File(conf_file_loc);
- if (!f.exists()) {
- if (!f.mkdirs()) {
- console.printError("Unable to create log directory " + conf_file_loc);
- return;
- }
- }
- Random randGen = new Random();
- do {
- histFileName = conf_file_loc + "/hive_job_log_" + ss.getSessionId() + "_"
- + Math.abs(randGen.nextInt()) + ".txt";
- } while (new File(histFileName).exists());
- console.printInfo("Hive history file=" + histFileName);
- histStream = new PrintWriter(histFileName);
-
- HashMap<String, String> hm = new HashMap<String, String>();
- hm.put(Keys.SESSION_ID.name(), ss.getSessionId());
- log(RecordTypes.SessionStart, hm);
- } catch (FileNotFoundException e) {
- console.printError("FAILED: Failed to open Query Log : " + histFileName
- + " " + e.getMessage(), "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
-
- }
/**
* @return historyFileName
*/
- public String getHistFileName() {
- return histFileName;
- }
-
- /**
- * Write the a history record to history file.
- *
- * @param rt
- * @param keyValMap
- */
- void log(RecordTypes rt, Map<String, String> keyValMap) {
-
- if (histStream == null) {
- return;
- }
-
- StringBuilder sb = new StringBuilder();
- sb.append(rt.name());
-
- for (Map.Entry<String, String> ent : keyValMap.entrySet()) {
-
- sb.append(DELIMITER);
- String key = ent.getKey();
- String val = ent.getValue();
- if(val != null) {
- val = val.replace("\r","").replace("\n", " ");
- }
- sb.append(key + "=\"" + val + "\"");
-
- }
- sb.append(DELIMITER);
- sb.append(Keys.TIME.name() + "=\"" + System.currentTimeMillis() + "\"");
- histStream.println(sb);
- histStream.flush();
-
- }
+ public String getHistFileName();
/**
- * Called at the start of job Driver.execute().
+ * Called at the start of query execution in Driver.execute().
*/
- public void startQuery(String cmd, String id) {
- SessionState ss = SessionState.get();
- if (ss == null) {
- return;
- }
- QueryInfo ji = new QueryInfo();
-
- ji.hm.put(Keys.QUERY_ID.name(), id);
- ji.hm.put(Keys.QUERY_STRING.name(), cmd);
-
- queryInfoMap.put(id, ji);
-
- log(RecordTypes.QueryStart, ji.hm);
-
- }
+ public void startQuery(String cmd, String id);
/**
- * Used to set job status and other attributes of a job.
+ * Used to set query status and other attributes of a query
*
* @param queryId
* @param propName
* @param propValue
*/
- public void setQueryProperty(String queryId, Keys propName, String propValue) {
- QueryInfo ji = queryInfoMap.get(queryId);
- if (ji == null) {
- return;
- }
- ji.hm.put(propName.name(), propValue);
- }
+ public void setQueryProperty(String queryId, Keys propName, String propValue);
/**
* Used to set task properties.
@@ -341,14 +144,7 @@ public class HiveHistory {
* @param propValue
*/
public void setTaskProperty(String queryId, String taskId, Keys propName,
- String propValue) {
- String id = queryId + ":" + taskId;
- TaskInfo ti = taskInfoMap.get(id);
- if (ti == null) {
- return;
- }
- ti.hm.put(propName.name(), propValue);
- }
+ String propValue);
/**
* Serialize the task counters and set as a task property.
@@ -357,190 +153,62 @@ public class HiveHistory {
* @param taskId
* @param ctrs
*/
- public void setTaskCounters(String queryId, String taskId, Counters ctrs) {
- String id = queryId + ":" + taskId;
- QueryInfo ji = queryInfoMap.get(queryId);
- StringBuilder sb1 = new StringBuilder("");
- TaskInfo ti = taskInfoMap.get(id);
- if ((ti == null) || (ctrs == null)) {
- return;
- }
- StringBuilder sb = new StringBuilder("");
- try {
-
- boolean first = true;
- for (Group group : ctrs) {
- for (Counter counter : group) {
- if (first) {
- first = false;
- } else {
- sb.append(',');
- }
- sb.append(group.getDisplayName());
- sb.append('.');
- sb.append(counter.getDisplayName());
- sb.append(':');
- sb.append(counter.getCounter());
- String tab = getRowCountTableName(counter.getDisplayName());
- if (tab != null) {
- if (sb1.length() > 0) {
- sb1.append(",");
- }
- sb1.append(tab);
- sb1.append('~');
- sb1.append(counter.getCounter());
- ji.rowCountMap.put(tab, counter.getCounter());
-
- }
- }
- }
-
- } catch (Exception e) {
- LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
- }
- if (sb1.length() > 0) {
- taskInfoMap.get(id).hm.put(Keys.ROWS_INSERTED.name(), sb1.toString());
- queryInfoMap.get(queryId).hm.put(Keys.ROWS_INSERTED.name(), sb1
- .toString());
- }
- if (sb.length() > 0) {
- taskInfoMap.get(id).hm.put(Keys.TASK_COUNTERS.name(), sb.toString());
- }
- }
+ public void setTaskCounters(String queryId, String taskId, Counters ctrs);
- public void printRowCount(String queryId) {
- QueryInfo ji = queryInfoMap.get(queryId);
- if (ji == null) {
- return;
- }
- for (String tab : ji.rowCountMap.keySet()) {
- console.printInfo(ji.rowCountMap.get(tab) + " Rows loaded to " + tab);
- }
- }
+ public void printRowCount(String queryId);
/**
- * Called at the end of Job. A Job is sql query.
+ * Called at the end of a query
*
* @param queryId
*/
- public void endQuery(String queryId) {
- QueryInfo ji = queryInfoMap.get(queryId);
- if (ji == null) {
- return;
- }
- log(RecordTypes.QueryEnd, ji.hm);
- queryInfoMap.remove(queryId);
- }
+ public void endQuery(String queryId);
/**
- * Called at the start of a task. Called by Driver.run() A Job can have
+ * Called at the start of a task. Called by Driver.run() A query can have
* multiple tasks. Tasks will have multiple operator.
*
* @param task
*/
public void startTask(String queryId, Task<? extends Serializable> task,
- String taskName) {
- SessionState ss = SessionState.get();
- if (ss == null) {
- return;
- }
- TaskInfo ti = new TaskInfo();
-
- ti.hm.put(Keys.QUERY_ID.name(), ss.getQueryId());
- ti.hm.put(Keys.TASK_ID.name(), task.getId());
- ti.hm.put(Keys.TASK_NAME.name(), taskName);
-
- String id = queryId + ":" + task.getId();
- taskInfoMap.put(id, ti);
-
- log(RecordTypes.TaskStart, ti.hm);
-
- }
+ String taskName);
/**
* Called at the end of a task.
*
* @param task
*/
- public void endTask(String queryId, Task<? extends Serializable> task) {
- String id = queryId + ":" + task.getId();
- TaskInfo ti = taskInfoMap.get(id);
-
- if (ti == null) {
- return;
- }
- log(RecordTypes.TaskEnd, ti.hm);
- taskInfoMap.remove(id);
- }
+ public void endTask(String queryId, Task<? extends Serializable> task);
/**
- * Called at the end of a task.
+ * Logs progress of a task if ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS is
+ * set to true
*
* @param task
*/
- public void progressTask(String queryId, Task<? extends Serializable> task) {
- String id = queryId + ":" + task.getId();
- TaskInfo ti = taskInfoMap.get(id);
- if (ti == null) {
- return;
- }
- log(RecordTypes.TaskProgress, ti.hm);
+ public void progressTask(String queryId, Task<? extends Serializable> task);
- }
/**
- * write out counters.
+ * Logs the current plan state
+ * @param plan
+ * @throws IOException
*/
- static ThreadLocal<Map<String,String>> ctrMapFactory =
- new ThreadLocal<Map<String, String>>() {
- @Override
- protected Map<String,String> initialValue() {
- return new HashMap<String,String>();
- }
- };
-
- public void logPlanProgress(QueryPlan plan) throws IOException {
- Map<String,String> ctrmap = ctrMapFactory.get();
- ctrmap.put("plan", plan.toString());
- log(RecordTypes.Counters, ctrmap);
- }
+ public void logPlanProgress(QueryPlan plan) throws IOException;
+
/**
- * Set the table to id map.
+ * Set the id to table name map
*
* @param map
*/
- public void setIdToTableMap(Map<String, String> map) {
- idToTableMap = map;
- }
+ public void setIdToTableMap(Map<String, String> map);
/**
- * Returns table name for the counter name.
- *
- * @param name
- * @return tableName
+ * Close the log file stream
*/
- String getRowCountTableName(String name) {
- if (idToTableMap == null) {
- return null;
- }
- Matcher m = rowCountPattern.matcher(name);
-
- if (m.find()) {
- String tuple = m.group(1);
- return idToTableMap.get(tuple);
- }
- return null;
+ public void closeStream();
- }
- public void closeStream() {
- IOUtils.cleanup(LOG, histStream);
- }
- @Override
- public void finalize() throws Throwable {
- closeStream();
- super.finalize();
- }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryViewer.java Fri Aug 16 01:21:54 2013
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.history.HiveHistory.Listener;
import org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
@@ -35,8 +37,8 @@ import org.apache.hadoop.hive.ql.history
public class HiveHistoryViewer implements Listener {
String historyFile;
-
String sessionId;
+ private static final Log LOG = LogFactory.getLog(HiveHistoryViewer.class);
// Job Hash Map
private final HashMap<String, QueryInfo> jobInfoMap = new HashMap<String, QueryInfo>();
@@ -65,19 +67,18 @@ public class HiveHistoryViewer implement
* Parse history files.
*/
void init() {
-
try {
- HiveHistory.parseHiveHistory(historyFile, this);
+ HiveHistoryUtil.parseHiveHistory(historyFile, this);
} catch (IOException e) {
- // TODO Auto-generated catch block
+ // TODO pass on this exception
e.printStackTrace();
+ LOG.error("Error parsing hive history log file", e);
}
-
}
/**
- * Implementation Listner interface function.
- *
+ * Implementation Listener interface function.
+ *
* @see org.apache.hadoop.hive.ql.history.HiveHistory.Listener#handle(org.apache.hadoop.hive.ql.history.HiveHistory.RecordTypes,
* java.util.Map)
*/
Copied: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (from r1513659, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java)
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?p2=hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java&p1=hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java&r1=1513659&r2=1514554&rev=1514554&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Fri Aug 16 01:21:54 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* Interface for reading integers.
@@ -52,4 +53,12 @@ interface IntegerReader {
* @throws IOException
*/
long next() throws IOException;
+
+ /**
+ * Return the next available vector for values.
+ * @return
+ * @throws IOException
+ */
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Aug 16 01:21:54 2013
@@ -97,7 +97,7 @@ public final class OrcFile {
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
- return new WriterImpl(fs, path, inspector, stripeSize, compress,
+ return new WriterImpl(fs, path, conf, inspector, stripeSize, compress,
bufferSize, rowIndexStride, getMemoryManager(conf));
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java?rev=1514554&r1=1514553&r2=1514554&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OutStream.java Fri Aug 16 01:21:54 2013
@@ -76,11 +76,7 @@ class OutStream extends PositionedOutput
}
public void clear() throws IOException {
- uncompressedBytes = 0;
- compressedBytes = 0;
- compressed = null;
- overflow = null;
- current = null;
+ flush();
suppress = false;
}
@@ -246,7 +242,10 @@ class OutStream extends PositionedOutput
receiver.output(compressed);
compressed = null;
}
- clear();
+ uncompressedBytes = 0;
+ compressedBytes = 0;
+ overflow = null;
+ current = null;
}
@Override