You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/29 17:50:17 UTC
svn commit: r1508111 [2/27] - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/
java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/index/compact/
java/org/apache/hadoop/hive/ql/io/ java/org/...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java Mon Jul 29 15:50:12 2013
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.Mapper;
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapred.Mapper;
*
*/
@Explain(displayName = "Partial Scan Statistics")
-public class PartialScanWork extends MapredWork implements Serializable {
+public class PartialScanWork extends MapWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -52,9 +52,6 @@ public class PartialScanWork extends Map
if(this.getPathToPartitionInfo() == null) {
this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
}
- if(this.getNumReduceTasks() == null) {
- this.setNumReduceTasks(0);
- }
for(String path: this.inputPaths) {
this.getPathToPartitionInfo().put(path, partDesc);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java Mon Jul 29 15:50:12 2013
@@ -76,7 +76,7 @@ public class ColumnTruncateMapper extend
@Override
public void configure(JobConf job) {
jc = job;
- work = (ColumnTruncateWork) Utilities.getMapRedWork(job);
+ work = (ColumnTruncateWork) Utilities.getMapWork(job);
String specPath = work.getOutputDir();
Path tmpPath = Utilities.toTempPath(specPath);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java Mon Jul 29 15:50:12 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.Throttle;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -165,7 +166,9 @@ public class ColumnTruncateTask extends
try {
addInputPaths(job, work);
- Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+ MapredWork mrWork = new MapredWork();
+ mrWork.setMapWork(work);
+ Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
// logs
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java Mon Jul 29 15:50:12 2013
@@ -27,12 +27,12 @@ import org.apache.hadoop.hive.ql.io.rcfi
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.Mapper;
@Explain(displayName = "Column Truncate")
-public class ColumnTruncateWork extends MapredWork implements Serializable {
+public class ColumnTruncateWork extends MapWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -64,9 +64,6 @@ public class ColumnTruncateWork extends
if(this.getPathToPartitionInfo() == null) {
this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
}
- if(this.getNumReduceTasks() == null) {
- this.setNumReduceTasks(0);
- }
this.getPathToPartitionInfo().put(inputDir, partDesc);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Mon Jul 29 15:50:12 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Co
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -144,10 +145,10 @@ public class GenMRFileSink1 implements N
// or for a map-reduce job
MapredWork currWork = (MapredWork) currTask.getWork();
boolean mergeMapOnly =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReducer() == null;
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null;
boolean mergeMapRed =
hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
- currWork.getReducer() != null;
+ currWork.getReduceWork() != null;
if (mergeMapOnly || mergeMapRed) {
chDir = true;
}
@@ -239,7 +240,10 @@ public class GenMRFileSink1 implements N
// mark the MapredWork and FileSinkOperator for gathering stats
nd.getConf().setGatherStats(true);
- mrWork.setGatheringStats(true);
+ mrWork.getMapWork().setGatheringStats(true);
+ if (mrWork.getReduceWork() != null) {
+ mrWork.getReduceWork().setGatheringStats(true);
+ }
nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
nd.getConf().setMaxStatsKeyPrefixLength(
hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
@@ -345,7 +349,8 @@ public class GenMRFileSink1 implements N
//
MoveWork dummyMv = new MoveWork(null, null, null,
new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
- MapredWork cplan;
+ MapWork cplan;
+ Serializable work;
if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
@@ -358,6 +363,7 @@ public class GenMRFileSink1 implements N
LOG.info("RCFile format- Using block level merge");
cplan = createRCFileMergeTask(fsInputDesc, finalName,
dpCtx != null && dpCtx.getNumDPCols() > 0);
+ work = cplan;
} catch (ClassNotFoundException e) {
String msg = "Illegal input format class: " + inputFormatClass;
throw new SemanticException(msg);
@@ -365,12 +371,14 @@ public class GenMRFileSink1 implements N
} else {
cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+ work = new MapredWork();
+ ((MapredWork)work).setMapWork(cplan);
// use CombineHiveInputFormat for map-only merging
}
cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
// NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
// know if merge MR2 will be triggered at execution time
- ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, cplan,
+ ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work,
fsInputDesc.getFinalDirName());
// keep the dynamic partition context in conditional task resolver context
@@ -471,7 +479,7 @@ public class GenMRFileSink1 implements N
* the last FileSinkOperator in the parent MapReduce work
* @return the MapredWork
*/
- private MapredWork createMRWorkForMergingFiles (HiveConf conf,
+ private MapWork createMRWorkForMergingFiles (HiveConf conf,
Operator<? extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
ArrayList<String> aliases = new ArrayList<String>();
@@ -480,10 +488,10 @@ public class GenMRFileSink1 implements N
aliases.add(inputDir); // dummy alias: just use the input path
// constructing the default MapredWork
- MapredWork cplan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+ MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+ MapWork cplan = cMrPlan.getMapWork();
cplan.getPathToAliases().put(inputDir, aliases);
cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
- cplan.setNumReduceTasks(0);
cplan.getAliasToWork().put(inputDir, topOp);
cplan.setMapperCannotSpanPartns(true);
@@ -498,7 +506,7 @@ public class GenMRFileSink1 implements N
* @return MergeWork if table is stored as RCFile,
* null otherwise
*/
- private MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+ private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
String finalName, boolean hasDynamicPartitions) throws SemanticException {
String inputDir = fsInputDesc.getFinalDirName();
@@ -561,7 +569,7 @@ public class GenMRFileSink1 implements N
*/
private ConditionalTask createCondTask(HiveConf conf,
Task<? extends Serializable> currTask, MoveWork mvWork,
- MapredWork mergeWork, String inputPath) {
+ Serializable mergeWork, String inputPath) {
// There are 3 options for this ConditionalTask:
// 1) Merge the partitions
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink1.java Mon Jul 29 15:50:12 2013
@@ -77,7 +77,7 @@ public class GenMRRedSink1 implements No
// If the plan for this reducer does not exist, initialize the plan
if (oldTask == null) {
- if (currPlan.getReducer() == null) {
+ if (currPlan.getReduceWork() == null) {
GenMapRedUtils.initPlan(op, ctx);
} else {
GenMapRedUtils.splitPlan(op, ctx);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRRedSink3.java Mon Jul 29 15:50:12 2013
@@ -85,13 +85,13 @@ public class GenMRRedSink3 implements No
// If the plan for this reducer does not exist, initialize the plan
if (reducerTask == null) {
// When the reducer is encountered for the first time
- if (plan.getReducer() == null) {
+ if (plan.getReduceWork() == null) {
GenMapRedUtils.initUnionPlan(op, union, ctx, unionTask);
// When union is followed by a multi-table insert
} else {
GenMapRedUtils.splitPlan(op, ctx);
}
- } else if (plan.getReducer() == reducer) {
+ } else if (plan.getReduceWork() != null && plan.getReduceWork().getReducer() == reducer) {
// The union is already initialized. However, the union is walked from
// another input
// initUnionPlan is idempotent
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Mon Jul 29 15:50:12 2013
@@ -117,7 +117,10 @@ public class GenMRTableScan1 implements
handlePartialScanCommand(op, ctx, parseCtx, currTask, parseInfo, statsWork, statsTask);
}
- currWork.setGatheringStats(true);
+ currWork.getMapWork().setGatheringStats(true);
+ if (currWork.getReduceWork() != null) {
+ currWork.getReduceWork().setGatheringStats(true);
+ }
// NOTE: here we should use the new partition predicate pushdown API to get a list of pruned list,
// and pass it to setTaskPlan as the last parameter
Set<Partition> confirmedPartns = new HashSet<Partition>();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Jul 29 15:50:12 2013
@@ -62,12 +62,15 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -82,9 +85,9 @@ public final class GenMapRedUtils {
LOG = LogFactory.getLog("org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils");
}
- private static boolean needsTagging(Operator<? extends OperatorDesc> reducer) {
- return (reducer.getClass() == JoinOperator.class ||
- reducer.getClass() == DemuxOperator.class);
+ private static boolean needsTagging(ReduceWork rWork) {
+ return rWork != null && (rWork.getReducer().getClass() == JoinOperator.class ||
+ rWork.getReducer().getClass() == DemuxOperator.class);
}
/**
* Initialize the current plan by adding it to root tasks.
@@ -107,13 +110,14 @@ public final class GenMapRedUtils {
Operator<? extends OperatorDesc> currTopOp = opProcCtx.getCurrTopOp();
opTaskMap.put(reducer, currTask);
- plan.setReducer(reducer);
+ plan.setReduceWork(new ReduceWork());
+ plan.getReduceWork().setReducer(reducer);
ReduceSinkDesc desc = op.getConf();
- plan.setNumReduceTasks(desc.getNumReducers());
+ plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
- if (needsTagging(reducer)) {
- plan.setNeedsTagging(true);
+ if (needsTagging(plan.getReduceWork())) {
+ plan.getReduceWork().setNeedsTagging(true);
}
assert currTopOp != null;
@@ -150,13 +154,16 @@ public final class GenMapRedUtils {
opProcCtx.getOpTaskMap();
opTaskMap.put(reducer, unionTask);
- plan.setReducer(reducer);
+
+ plan.setReduceWork(new ReduceWork());
+ plan.getReduceWork().setReducer(reducer);
+ plan.getReduceWork().setReducer(reducer);
ReduceSinkDesc desc = op.getConf();
- plan.setNumReduceTasks(desc.getNumReducers());
+ plan.getReduceWork().setNumReduceTasks(desc.getNumReducers());
- if (needsTagging(reducer)) {
- plan.setNeedsTagging(true);
+ if (needsTagging(plan.getReduceWork())) {
+ plan.getReduceWork().setNeedsTagging(true);
}
initUnionPlan(opProcCtx, currUnionOp, unionTask, false);
@@ -190,13 +197,14 @@ public final class GenMapRedUtils {
for (int pos = 0; pos < size; pos++) {
String taskTmpDir = taskTmpDirLst.get(pos);
TableDesc tt_desc = tt_descLst.get(pos);
- if (plan.getPathToAliases().get(taskTmpDir) == null) {
- plan.getPathToAliases().put(taskTmpDir,
+ MapWork mWork = plan.getMapWork();
+ if (mWork.getPathToAliases().get(taskTmpDir) == null) {
+ mWork.getPathToAliases().put(taskTmpDir,
new ArrayList<String>());
- plan.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
- plan.getPathToPartitionInfo().put(taskTmpDir,
+ mWork.getPathToAliases().get(taskTmpDir).add(taskTmpDir);
+ mWork.getPathToPartitionInfo().put(taskTmpDir,
new PartitionDesc(tt_desc, null));
- plan.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
+ mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos));
}
}
}
@@ -305,7 +313,8 @@ public final class GenMapRedUtils {
}
if (oldTask instanceof MapRedTask && currTask instanceof MapRedTask) {
- ((MapRedTask)currTask).getWork().mergingInto(((MapRedTask) oldTask).getWork());
+ ((MapRedTask)currTask).getWork().getMapWork()
+ .mergingInto(((MapRedTask) oldTask).getWork().getMapWork());
}
opProcCtx.setCurrTopOp(null);
@@ -358,9 +367,11 @@ public final class GenMapRedUtils {
Operator<? extends OperatorDesc> reducer = cRS.getChildOperators().get(0);
// Add the reducer
- childPlan.setReducer(reducer);
+ ReduceWork rWork = new ReduceWork();
+ childPlan.setReduceWork(rWork);
+ rWork.setReducer(reducer);
ReduceSinkDesc desc = cRS.getConf();
- childPlan.setNumReduceTasks(new Integer(desc.getNumReducers()));
+ childPlan.getReduceWork().setNumReduceTasks(new Integer(desc.getNumReducers()));
opProcCtx.getOpTaskMap().put(reducer, childTask);
@@ -428,7 +439,7 @@ public final class GenMapRedUtils {
public static void setTaskPlan(String alias_id,
Operator<? extends OperatorDesc> topOp, Task<?> task, boolean local,
GenMRProcContext opProcCtx, PrunedPartitionList pList) throws SemanticException {
- MapredWork plan = (MapredWork) task.getWork();
+ MapWork plan = ((MapredWork) task.getWork()).getMapWork();
ParseContext parseCtx = opProcCtx.getParseCtx();
Set<ReadEntity> inputs = opProcCtx.getInputs();
@@ -711,7 +722,7 @@ public final class GenMapRedUtils {
* table descriptor
*/
public static void setTaskPlan(String path, String alias,
- Operator<? extends OperatorDesc> topOp, MapredWork plan, boolean local,
+ Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
TableDesc tt_desc) throws SemanticException {
if (path == null || alias == null) {
@@ -750,7 +761,7 @@ public final class GenMapRedUtils {
* @param topOp
* current top operator in the path
*/
- public static void setKeyAndValueDesc(MapredWork plan,
+ public static void setKeyAndValueDesc(ReduceWork plan,
Operator<? extends OperatorDesc> topOp) {
if (topOp == null) {
return;
@@ -791,12 +802,12 @@ public final class GenMapRedUtils {
}
} else if (task instanceof ExecDriver) {
MapredWork work = (MapredWork) task.getWork();
- work.deriveExplainAttributes();
+ work.getMapWork().deriveExplainAttributes();
HashMap<String, Operator<? extends OperatorDesc>> opMap = work
- .getAliasToWork();
+ .getMapWork().getAliasToWork();
if (opMap != null && !opMap.isEmpty()) {
for (Operator<? extends OperatorDesc> op : opMap.values()) {
- setKeyAndValueDesc(work, op);
+ setKeyAndValueDesc(work.getReduceWork(), op);
}
}
}
@@ -817,7 +828,7 @@ public final class GenMapRedUtils {
*/
public static MapredWork getMapRedWork(ParseContext parseCtx) {
MapredWork work = getMapRedWorkFromConf(parseCtx.getConf());
- work.setNameToSplitSample(parseCtx.getNameToSplitSample());
+ work.getMapWork().setNameToSplitSample(parseCtx.getNameToSplitSample());
return work;
}
@@ -828,7 +839,8 @@ public final class GenMapRedUtils {
* @return the new plan
*/
public static MapredWork getMapRedWorkFromConf(HiveConf conf) {
- MapredWork work = new MapredWork();
+ MapredWork mrWork = new MapredWork();
+ MapWork work = mrWork.getMapWork();
boolean mapperCannotSpanPartns =
conf.getBoolVar(
@@ -837,11 +849,9 @@ public final class GenMapRedUtils {
work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
work.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
- work.setTagToValueDesc(new ArrayList<TableDesc>());
- work.setReducer(null);
work.setHadoopSupportsSplittable(
conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
- return work;
+ return mrWork;
}
/**
@@ -946,22 +956,22 @@ public final class GenMapRedUtils {
Operator<? extends OperatorDesc> reducer = op.getChildOperators().get(0);
- if (needsTagging(reducer)) {
+ if (needsTagging(cplan.getReduceWork())) {
String origStreamDesc;
streamDesc = "$INTNAME";
origStreamDesc = streamDesc;
int pos = 0;
- while (cplan.getAliasToWork().get(streamDesc) != null) {
+ while (cplan.getMapWork().getAliasToWork().get(streamDesc) != null) {
streamDesc = origStreamDesc.concat(String.valueOf(++pos));
}
// TODO: Allocate work to remove the temporary files and make that
// dependent on the redTask
- cplan.setNeedsTagging(true);
+ cplan.getReduceWork().setNeedsTagging(true);
}
// Add the path to alias mapping
- setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan, false, tt_desc);
+ setTaskPlan(taskTmpDir, streamDesc, ts_op, cplan.getMapWork(), false, tt_desc);
opProcCtx.setCurrTopOp(null);
opProcCtx.setCurrAliasId(null);
opProcCtx.setCurrTask(childTask);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Mon Jul 29 15:50:12 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -77,7 +78,7 @@ public final class MapJoinFactory {
*/
private static class TableScanMapJoinProcessor implements NodeProcessor {
- public static void setupBucketMapJoinInfo(MapredWork plan,
+ public static void setupBucketMapJoinInfo(MapWork plan,
AbstractMapJoinOperator<? extends MapJoinDesc> currMapJoinOp) {
if (currMapJoinOp != null) {
Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
@@ -214,7 +215,7 @@ public final class MapJoinFactory {
// the first time.
boolean local = pos != mapJoin.getConf().getPosBigTable();
if (oldTask == null) {
- assert currPlan.getReducer() == null;
+ assert currPlan.getReduceWork() == null;
initMapJoinPlan(mapJoin, currTask, ctx, local);
} else {
// The current plan can be thrown away after being merged with the
@@ -223,7 +224,7 @@ public final class MapJoinFactory {
ctx.setCurrTask(currTask = oldTask);
}
MapredWork plan = (MapredWork) currTask.getWork();
- setupBucketMapJoinInfo(plan, mapJoin);
+ setupBucketMapJoinInfo(plan.getMapWork(), mapJoin);
mapCurrCtx.put(mapJoin, new GenMapRedCtx(ctx.getCurrTask(), ctx.getCurrAliasId()));
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Jul 29 15:50:12 2013
@@ -134,7 +134,7 @@ public class MapJoinProcessor implements
new LinkedHashMap<String, FetchWork>());
for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
- newWork.getAliasToWork().entrySet()) {
+ newWork.getMapWork().getAliasToWork().entrySet()) {
String alias = entry.getKey();
Operator<? extends OperatorDesc> op = entry.getValue();
@@ -162,7 +162,7 @@ public class MapJoinProcessor implements
smallTableAliasList.add(alias);
// get input path and remove this alias from pathToAlias
// because this file will be fetched by fetch operator
- LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getPathToAliases();
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = newWork.getMapWork().getPathToAliases();
// keep record all the input path for this alias
HashSet<String> pathSet = new HashSet<String>();
@@ -193,7 +193,7 @@ public class MapJoinProcessor implements
List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
for (String tablePath : pathSet) {
- PartitionDesc partitionDesc = newWork.getPathToPartitionInfo().get(tablePath);
+ PartitionDesc partitionDesc = newWork.getMapWork().getPathToPartitionInfo().get(tablePath);
// create fetchwork for non partitioned table
if (partitionDesc.getPartSpec() == null || partitionDesc.getPartSpec().size() == 0) {
fetchWork = new FetchWork(tablePath, partitionDesc.getTableDesc());
@@ -205,7 +205,7 @@ public class MapJoinProcessor implements
}
// create fetchwork for partitioned table
if (fetchWork == null) {
- TableDesc table = newWork.getAliasToPartnInfo().get(alias).getTableDesc();
+ TableDesc table = newWork.getMapWork().getAliasToPartnInfo().get(alias).getTableDesc();
fetchWork = new FetchWork(partDir, partDesc, table);
}
// set alias to fetch work
@@ -213,13 +213,13 @@ public class MapJoinProcessor implements
}
// remove small table ailias from aliasToWork;Avoid concurrent modification
for (String alias : smallTableAliasList) {
- newWork.getAliasToWork().remove(alias);
+ newWork.getMapWork().getAliasToWork().remove(alias);
}
// set up local work
- newWork.setMapLocalWork(newLocalWork);
+ newWork.getMapWork().setMapLocalWork(newLocalWork);
// remove reducer
- newWork.setReducer(null);
+ newWork.setReduceWork(null);
// return the big table alias
if (bigTableAlias == null) {
throw new SemanticException("Big Table Alias is null");
@@ -240,8 +240,8 @@ public class MapJoinProcessor implements
public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
throws SemanticException {
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
- newWork.getOpParseCtxMap();
- QBJoinTree newJoinTree = newWork.getJoinTree();
+ newWork.getMapWork().getOpParseCtxMap();
+ QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
newJoinTree, mapJoinPos, true, false);
@@ -256,8 +256,8 @@ public class MapJoinProcessor implements
String bigTableAlias = MapJoinProcessor
.genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
// clean up the mapred work
- newWork.setOpParseCtxMap(null);
- newWork.setJoinTree(null);
+ newWork.getMapWork().setOpParseCtxMap(null);
+ newWork.getMapWork().setJoinTree(null);
return bigTableAlias;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/AbstractJoinTaskDispatcher.java Mon Jul 29 15:50:12 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
/**
* Common iteration methods for converting joins and sort-merge joins.
@@ -119,7 +120,7 @@ public abstract class AbstractJoinTaskDi
}
}
- public long getTotalKnownInputSize(Context context, MapredWork currWork,
+ public long getTotalKnownInputSize(Context context, MapWork currWork,
Map<String, ArrayList<String>> pathToAliases,
HashMap<String, Long> aliasToSize) throws SemanticException {
try {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingInferenceOptimizer.java Mon Jul 29 15:50:12 2013
@@ -86,13 +86,13 @@ public class BucketingSortingInferenceOp
continue;
}
- Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReducer();
- if (reducer == null) {
+ if (mapRedTask.getWork().getReduceWork() == null) {
continue;
}
+ Operator<? extends OperatorDesc> reducer = mapRedTask.getWork().getReduceWork().getReducer();
// uses sampling, which means it's not bucketed
- boolean disableBucketing = mapRedTask.getWork().getSamplingType() > 0;
+ boolean disableBucketing = mapRedTask.getWork().getMapWork().getSamplingType() > 0;
BucketingSortingCtx bCtx = new BucketingSortingCtx(disableBucketing);
// RuleRegExp rules are used to match operators anywhere in the tree
@@ -145,8 +145,8 @@ public class BucketingSortingInferenceOp
topNodes.add(reducer);
ogw.startWalking(topNodes, null);
- mapRedTask.getWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
- mapRedTask.getWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
+ mapRedTask.getWork().getMapWork().getBucketedColsByDirectory().putAll(bCtx.getBucketedColsByDirectory());
+ mapRedTask.getWork().getMapWork().getSortedColsByDirectory().putAll(bCtx.getSortedColsByDirectory());
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Mon Jul 29 15:50:12 2013
@@ -50,10 +50,12 @@ import org.apache.hadoop.hive.ql.plan.Co
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
/*
* Convert tasks involving JOIN into MAPJOIN.
@@ -108,7 +110,7 @@ public class CommonJoinTaskDispatcher ex
}
// Get the position of the big table for this join operator and the given alias
- private int getPosition(MapredWork work, Operator<? extends OperatorDesc> joinOp,
+ private int getPosition(MapWork work, Operator<? extends OperatorDesc> joinOp,
String alias) {
Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
@@ -127,9 +129,9 @@ public class CommonJoinTaskDispatcher ex
*/
private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task, Configuration conf) {
MapRedTask childTask = (MapRedTask) task.getChildTasks().get(0);
- MapredWork work = task.getWork();
+ MapWork work = task.getWork().getMapWork();
MapredLocalWork localWork = work.getMapLocalWork();
- MapredWork childWork = childTask.getWork();
+ MapWork childWork = childTask.getWork().getMapWork();
MapredLocalWork childLocalWork = childWork.getMapLocalWork();
// Can this be merged
@@ -262,19 +264,26 @@ public class CommonJoinTaskDispatcher ex
* @param childTask
*/
private void copyReducerConf(MapRedTask task, MapRedTask childTask) {
- MapredWork childWork = childTask.getWork();
+ MapredWork mrChildWork = childTask.getWork();
+ ReduceWork childWork = childTask.getWork().getReduceWork();
+ if (childWork == null) {
+ return;
+ }
+
Operator childReducer = childWork.getReducer();
MapredWork work = task.getWork();
if (childReducer == null) {
return;
}
- work.setReducer(childReducer);
- work.setNumReduceTasks(childWork.getNumReduceTasks());
- work.setJoinTree(childWork.getJoinTree());
- work.setNeedsTagging(childWork.getNeedsTagging());
+ ReduceWork rWork = new ReduceWork();
+ work.setReduceWork(rWork);
+ rWork.setReducer(childReducer);
+ rWork.setNumReduceTasks(childWork.getNumReduceTasks());
+ work.getMapWork().setJoinTree(mrChildWork.getMapWork().getJoinTree());
+ rWork.setNeedsTagging(childWork.getNeedsTagging());
// Make sure the key configuration is correct, clear and regenerate.
- work.getTagToValueDesc().clear();
+ rWork.getTagToValueDesc().clear();
GenMapRedUtils.setKeyAndValueDescForTaskTree(task);
}
@@ -309,10 +318,9 @@ public class CommonJoinTaskDispatcher ex
return;
}
MapRedTask childTask = (MapRedTask) firstChildTask;
- MapredWork mapJoinWork = mapJoinTask.getWork();
+ MapWork mapJoinWork = mapJoinTask.getWork().getMapWork();
MapredWork childWork = childTask.getWork();
- Operator childReducer = childWork.getReducer();
- if (childReducer == null) {
+ if (childWork.getReduceWork() == null) {
// Not a MR job, nothing to merge.
return;
}
@@ -322,7 +330,7 @@ public class CommonJoinTaskDispatcher ex
if (aliasToWork.size() > 1) {
return;
}
- Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
+ Map<String, ArrayList<String>> childPathToAliases = childWork.getMapWork().getPathToAliases();
if (childPathToAliases.size() > 1) {
return;
}
@@ -353,7 +361,7 @@ public class CommonJoinTaskDispatcher ex
}
MapredLocalWork mapJoinLocalWork = mapJoinWork.getMapLocalWork();
- MapredLocalWork childLocalWork = childWork.getMapLocalWork();
+ MapredLocalWork childLocalWork = childWork.getMapWork().getMapLocalWork();
// Either of them should not be bucketed
if ((mapJoinLocalWork != null && mapJoinLocalWork.getBucketMapjoinContext() != null) ||
@@ -361,12 +369,12 @@ public class CommonJoinTaskDispatcher ex
return;
}
- if (childWork.getAliasToWork().size() > 1) {
+ if (childWork.getMapWork().getAliasToWork().size() > 1) {
return;
}
Operator<? extends Serializable> childAliasOp =
- childWork.getAliasToWork().values().iterator().next();
+ childWork.getMapWork().getAliasToWork().values().iterator().next();
if (mapJoinTaskFileSinkOperator.getParentOperators().size() > 1) {
return;
}
@@ -393,10 +401,10 @@ public class CommonJoinTaskDispatcher ex
parentOps.add(parentFOp);
childAliasOp.setParentOperators(parentOps);
- mapJoinWork.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
- for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getPathToPartitionInfo()
+ mapJoinWork.getAliasToPartnInfo().putAll(childWork.getMapWork().getAliasToPartnInfo());
+ for (Map.Entry<String, PartitionDesc> childWorkEntry : childWork.getMapWork().getPathToPartitionInfo()
.entrySet()) {
- if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
+ if (childWork.getMapWork().getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
mapJoinWork.getPathToPartitionInfo()
.put(childWorkEntry.getKey(), childWorkEntry.getValue());
}
@@ -450,7 +458,7 @@ public class CommonJoinTaskDispatcher ex
}
currTask.setTaskTag(Task.COMMON_JOIN);
- MapredWork currWork = currTask.getWork();
+ MapWork currWork = currTask.getWork().getMapWork();
// create conditional work list and task list
List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -541,7 +549,7 @@ public class CommonJoinTaskDispatcher ex
if (convertJoinMapJoin) {
// create map join task and set big table as bigTablePosition
- MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
+ MapRedTask newTask = convertTaskToMapJoinTask(currTask.getWork(), bigTablePosition).getFirst();
newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
replaceTask(currTask, newTask, physicalContext);
@@ -577,9 +585,9 @@ public class CommonJoinTaskDispatcher ex
}
// deep copy a new mapred work from xml
// Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the plan
- String xml = currWork.toXML();
+ String xml = currTask.getWork().toXML();
InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+ MapredWork newWork = Utilities.deserializeObject(in);
// create map join task and set big table as i
ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);
@@ -659,14 +667,15 @@ public class CommonJoinTaskDispatcher ex
}
private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
- MapredWork work = task.getWork();
- if (work == null) {
+ MapWork mWork = task.getWork().getMapWork();
+ ReduceWork rWork = task.getWork().getReduceWork();
+ if (rWork == null) {
return null;
}
- Operator<? extends OperatorDesc> reducerOp = work.getReducer();
+ Operator<? extends OperatorDesc> reducerOp = rWork.getReducer();
if (reducerOp instanceof JoinOperator) {
/* Is any operator present, which prevents the conversion */
- Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
+ Map<String, Operator<? extends OperatorDesc>> aliasToWork = mWork.getAliasToWork();
for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
if (!checkOperatorOKMapJoinConversion(op)) {
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java Mon Jul 29 15:50:12 2013
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
import java.io.ByteArrayInputStream;
-import java.io.File;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -50,6 +49,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -104,6 +104,7 @@ public final class GenMRSkewJoinProcesso
* https://issues.apache.org/jira/browse/HIVE-964.
*
*/
+ @SuppressWarnings("unchecked")
public static void processSkewJoin(JoinOperator joinOp,
Task<? extends Serializable> currTask, ParseContext parseCtx)
throws SemanticException {
@@ -151,7 +152,7 @@ public final class GenMRSkewJoinProcesso
List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
MapredWork currPlan = (MapredWork) currTask.getWork();
- TableDesc keyTblDesc = (TableDesc) currPlan.getKeyDesc().clone();
+ TableDesc keyTblDesc = (TableDesc) currPlan.getReduceWork().getKeyDesc().clone();
List<String> joinKeys = Utilities
.getColumnNames(keyTblDesc.getProperties());
List<String> joinKeyTypes = Utilities.getColumnTypes(keyTblDesc
@@ -232,7 +233,7 @@ public final class GenMRSkewJoinProcesso
for (int i = 0; i < numAliases - 1; i++) {
Byte src = tags[i];
- MapredWork newPlan = PlanUtils.getMapRedWork();
+ MapWork newPlan = PlanUtils.getMapRedWork().getMapWork();
// This code has been only added for testing
boolean mapperCannotSpanPartns =
@@ -246,7 +247,7 @@ public final class GenMRSkewJoinProcesso
StringBuilder sb = new StringBuilder(xmlPlan);
ByteArrayInputStream bis;
bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
- clonePlan = Utilities.deserializeMapRedWork(bis, parseCtx.getConf());
+ clonePlan = Utilities.deserializeObject(bis);
} catch (UnsupportedEncodingException e) {
throw new SemanticException(e);
}
@@ -276,7 +277,7 @@ public final class GenMRSkewJoinProcesso
newPlan.getPathToPartitionInfo().put(bigKeyDirPath, part);
newPlan.getAliasToPartnInfo().put(alias, part);
- Operator<? extends OperatorDesc> reducer = clonePlan.getReducer();
+ Operator<? extends OperatorDesc> reducer = clonePlan.getReduceWork().getReducer();
assert reducer instanceof JoinOperator;
JoinOperator cloneJoinOp = (JoinOperator) reducer;
@@ -328,16 +329,18 @@ public final class GenMRSkewJoinProcesso
newPlan
.setMinSplitSize(HiveConf.getLongVar(jc, HiveConf.ConfVars.HIVESKEWJOINMAPJOINMINSPLIT));
newPlan.setInputformat(HiveInputFormat.class.getName());
- Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(
- newPlan, jc);
+
+ MapredWork w = new MapredWork();
+ w.setMapWork(newPlan);
+
+ Task<? extends Serializable> skewJoinMapJoinTask = TaskFactory.get(w, jc);
bigKeysDirToTaskMap.put(bigKeyDirPath, skewJoinMapJoinTask);
listWorks.add(skewJoinMapJoinTask.getWork());
listTasks.add(skewJoinMapJoinTask);
}
ConditionalWork cndWork = new ConditionalWork(listWorks);
- ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork,
- parseCtx.getConf());
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
cndTsk.setListTasks(listTasks);
cndTsk.setResolver(new ConditionalResolverSkewJoin());
cndTsk
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Mon Jul 29 15:50:12 2013
@@ -48,8 +48,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import
- org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -98,14 +97,14 @@ public class MapJoinResolver implements
ConditionalTask conditionalTask) throws SemanticException {
// get current mapred work and its local work
MapredWork mapredWork = (MapredWork) currTask.getWork();
- MapredLocalWork localwork = mapredWork.getMapLocalWork();
+ MapredLocalWork localwork = mapredWork.getMapWork().getMapLocalWork();
if (localwork != null) {
// get the context info and set up the shared tmp URI
Context ctx = physicalContext.getContext();
String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
localwork.setTmpFileURI(tmpFileURI);
String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
- mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
+ mapredWork.getMapWork().setTmpHDFSFileURI(hdfsTmpURI);
// create a task for this local work; right now, this local work is shared
// by the original MapredTask and this new generated MapredLocalTask.
MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork, physicalContext
@@ -134,7 +133,7 @@ public class MapJoinResolver implements
newLocalWork.setTmpFileURI(tmpFileURI);
newLocalWork.setInputFileChangeSensitive(localwork.getInputFileChangeSensitive());
newLocalWork.setBucketMapjoinContext(localwork.copyPartSpecMappingOnly());
- mapredWork.setMapLocalWork(newLocalWork);
+ mapredWork.getMapWork().setMapLocalWork(newLocalWork);
// get all parent tasks
List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
currTask.setParentTasks(null);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MetadataOnlyOptimizer.java Mon Jul 29 15:50:12 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -188,7 +189,7 @@ public class MetadataOnlyOptimizer imple
physicalContext = context;
}
- private String getAliasForTableScanOperator(MapredWork work,
+ private String getAliasForTableScanOperator(MapWork work,
TableScanOperator tso) {
for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
@@ -211,7 +212,7 @@ public class MetadataOnlyOptimizer imple
return desc;
}
- private List<String> getPathsForAlias(MapredWork work, String alias) {
+ private List<String> getPathsForAlias(MapWork work, String alias) {
List<String> paths = new ArrayList<String>();
for (Map.Entry<String, ArrayList<String>> entry : work.getPathToAliases().entrySet()) {
@@ -223,7 +224,7 @@ public class MetadataOnlyOptimizer imple
return paths;
}
- private void processAlias(MapredWork work, String alias) {
+ private void processAlias(MapWork work, String alias) {
// Change the alias partition desc
PartitionDesc aliasPartn = work.getAliasToPartnInfo().get(alias);
changePartitionToMetadataOnly(aliasPartn);
@@ -299,7 +300,7 @@ public class MetadataOnlyOptimizer imple
while (iterator.hasNext()) {
TableScanOperator tso = iterator.next();
- MapredWork work = (MapredWork) task.getWork();
+ MapWork work = ((MapredWork) task.getWork()).getMapWork();
String alias = getAliasForTableScanOperator(work, tso);
LOG.info("Metadata only table scan for " + alias);
processAlias(work, alias);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SamplingOptimizer.java Mon Jul 29 15:50:12 2013
@@ -27,7 +27,9 @@ import org.apache.hadoop.hive.ql.exec.Ta
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
/**
* Mark final MapredWork for ORDER BY to use sampling and set number of reduce task as -1
@@ -39,12 +41,16 @@ public class SamplingOptimizer implement
if (!(task instanceof MapRedTask) || !((MapRedTask)task).getWork().isFinalMapRed()) {
continue; // this could be replaced by bucketing on RS + bucketed fetcher for next MR
}
- MapredWork mapreWork = ((MapRedTask) task).getWork();
- if (mapreWork.getNumReduceTasks() != 1 || mapreWork.getAliasToWork().size() != 1 ||
- mapreWork.getSamplingType() > 0 || mapreWork.getReducer() == null) {
+ MapredWork mrWork = ((MapRedTask) task).getWork();
+ MapWork mapWork = mrWork.getMapWork();
+ ReduceWork reduceWork = mrWork.getReduceWork();
+
+ if (reduceWork == null || reduceWork.getNumReduceTasks() != 1
+ || mapWork.getAliasToWork().size() != 1 || mapWork.getSamplingType() > 0
+ || reduceWork.getReducer() == null) {
continue;
}
- Operator<?> operator = mapreWork.getAliasToWork().values().iterator().next();
+ Operator<?> operator = mapWork.getAliasToWork().values().iterator().next();
if (!(operator instanceof TableScanOperator)) {
continue;
}
@@ -55,8 +61,8 @@ public class SamplingOptimizer implement
continue;
}
child.getConf().setNumReducers(-1);
- mapreWork.setNumReduceTasks(-1);
- mapreWork.setSamplingType(MapredWork.SAMPLING_ON_START);
+ reduceWork.setNumReduceTasks(-1);
+ mapWork.setSamplingType(MapWork.SAMPLING_ON_START);
}
return pctx;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SkewJoinResolver.java Mon Jul 29 15:50:12 2013
@@ -74,7 +74,7 @@ public class SkewJoinResolver implements
Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
if (!task.isMapRedTask() || task instanceof ConditionalTask
- || ((MapredWork) task.getWork()).getReducer() == null) {
+ || ((MapredWork) task.getWork()).getReduceWork() == null) {
return null;
}
@@ -94,7 +94,9 @@ public class SkewJoinResolver implements
// iterator the reducer operator tree
ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.add(((MapredWork) task.getWork()).getReducer());
+ if (((MapredWork)task.getWork()).getReduceWork() != null) {
+ topNodes.add(((MapredWork) task.getWork()).getReduceWork().getReducer());
+ }
ogw.startWalking(topNodes, null);
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Mon Jul 29 15:50:12 2013
@@ -52,10 +52,12 @@ import org.apache.hadoop.hive.ql.plan.Co
import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
/**
@@ -72,7 +74,7 @@ public class SortMergeJoinTaskDispatcher
// Convert the work in the SMB plan to a regular join
// Note that the operator tree is not fixed, only the path/alias mappings in the
// plan are fixed. The operator tree will still contain the SMBJoinOperator
- private void genSMBJoinWork(MapredWork currWork, SMBMapJoinOperator smbJoinOp) {
+ private void genSMBJoinWork(MapWork currWork, SMBMapJoinOperator smbJoinOp) {
// Remove the paths which are not part of aliasToPartitionInfo
Map<String, PartitionDesc> aliasToPartitionInfo = currWork.getAliasToPartnInfo();
List<String> removePaths = new ArrayList<String>();
@@ -150,7 +152,7 @@ public class SortMergeJoinTaskDispatcher
// deep copy a new mapred work
InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork currJoinWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+ MapredWork currJoinWork = Utilities.deserializeObject(in);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
// Add the row resolver for the new operator
@@ -158,7 +160,7 @@ public class SortMergeJoinTaskDispatcher
physicalContext.getParseContext().getOpParseCtx();
opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
// change the newly created map-red plan as if it was a join operator
- genSMBJoinWork(currJoinWork, newSMBJoinOp);
+ genSMBJoinWork(currJoinWork.getMapWork(), newSMBJoinOp);
return currJoinWork;
} catch (Exception e) {
e.printStackTrace();
@@ -174,24 +176,25 @@ public class SortMergeJoinTaskDispatcher
throws UnsupportedEncodingException, SemanticException {
// deep copy a new mapred work from xml
InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
- MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
+ MapredWork newWork = Utilities.deserializeObject(in);
// create a mapred task for this work
MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
.getParseContext().getConf());
// generate the map join operator; already checked the map join
MapJoinOperator newMapJoinOp =
getMapJoinOperator(newTask, newWork, smbJoinOp, joinTree, bigTablePosition);
+
// The reducer needs to be restored - Consider a query like:
// select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key;
// The reducer contains a groupby, which needs to be restored.
- Operator<? extends OperatorDesc> reducer = newWork.getReducer();
+ ReduceWork rWork = newWork.getReduceWork();
// create the local work for this plan
String bigTableAlias =
MapJoinProcessor.genLocalWorkForMapJoin(newWork, newMapJoinOp, bigTablePosition);
// restore the reducer
- newWork.setReducer(reducer);
+ newWork.setReduceWork(rWork);
return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
}
@@ -259,10 +262,10 @@ public class SortMergeJoinTaskDispatcher
MapredWork currJoinWork = convertSMBWorkToJoinWork(currWork, originalSMBJoinOp);
SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
- currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
- currWork.setJoinTree(joinTree);
- currJoinWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
- currJoinWork.setJoinTree(joinTree);
+ currWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
+ currWork.getMapWork().setJoinTree(joinTree);
+ currJoinWork.getMapWork().setOpParseCtxMap(parseCtx.getOpParseCtx());
+ currJoinWork.getMapWork().setJoinTree(joinTree);
// create conditional work list and task list
List<Serializable> listWorks = new ArrayList<Serializable>();
@@ -272,7 +275,7 @@ public class SortMergeJoinTaskDispatcher
HashMap<String, Task<? extends Serializable>> aliasToTask =
new HashMap<String, Task<? extends Serializable>>();
// Note that pathToAlias will behave as if the original plan was a join plan
- HashMap<String, ArrayList<String>> pathToAliases = currJoinWork.getPathToAliases();
+ HashMap<String, ArrayList<String>> pathToAliases = currJoinWork.getMapWork().getPathToAliases();
// generate a map join task for the big table
SMBJoinDesc originalSMBJoinDesc = originalSMBJoinOp.getConf();
@@ -289,7 +292,7 @@ public class SortMergeJoinTaskDispatcher
HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
Configuration conf = context.getConf();
try {
- long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork,
+ long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork.getMapWork(),
pathToAliases, aliasToSize);
String xml = currJoinWork.toXML();
@@ -339,8 +342,8 @@ public class SortMergeJoinTaskDispatcher
listWorks.add(currTask.getWork());
listTasks.add(currTask);
// clear JoinTree and OP Parse Context
- currWork.setOpParseCtxMap(null);
- currWork.setJoinTree(null);
+ currWork.getMapWork().setOpParseCtxMap(null);
+ currWork.getMapWork().setJoinTree(null);
// create conditional task and insert conditional task into task tree
ConditionalWork cndWork = new ConditionalWork(listWorks);
@@ -417,9 +420,9 @@ public class SortMergeJoinTaskDispatcher
}
private SMBMapJoinOperator getSMBMapJoinOp(MapredWork work) throws SemanticException {
- if (work != null) {
- Operator<? extends OperatorDesc> reducer = work.getReducer();
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ if (work != null && work.getReduceWork() != null) {
+ Operator<? extends OperatorDesc> reducer = work.getReduceWork().getReducer();
+ for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
SMBMapJoinOperator smbMapJoinOp = getSMBMapJoinOp(op, reducer);
if (smbMapJoinOp != null) {
return smbMapJoinOp;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereProcessor.java Mon Jul 29 15:50:12 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -162,7 +163,7 @@ public class IndexWhereProcessor impleme
HiveIndexQueryContext queryContext = queryContexts.get(chosenIndex);
// prepare the map reduce job to use indexing
- MapredWork work = currentTask.getWork();
+ MapWork work = currentTask.getWork().getMapWork();
work.setInputformat(queryContext.getIndexInputFormat());
work.addIndexIntermediateFile(queryContext.getIndexIntermediateFile());
// modify inputs based on index query
@@ -204,7 +205,7 @@ public class IndexWhereProcessor impleme
// check the size
try {
- ContentSummary inputSummary = Utilities.getInputSummary(pctx.getContext(), task.getWork(), null);
+ ContentSummary inputSummary = Utilities.getInputSummary(pctx.getContext(), task.getWork().getMapWork(), null);
long inputSize = inputSummary.getLength();
if (!indexHandler.checkQuerySize(inputSize, pctx.getConf())) {
queryContext.setQueryTasks(null);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/index/IndexWhereTaskDispatcher.java Mon Jul 29 15:50:12 2013
@@ -93,7 +93,7 @@ public class IndexWhereTaskDispatcher im
GraphWalker ogw = new DefaultGraphWalker(dispatcher);
ArrayList<Node> topNodes = new ArrayList<Node>();
if (task.getWork() instanceof MapredWork) {
- topNodes.addAll(((MapredWork)task.getWork()).getAliasToWork().values());
+ topNodes.addAll(((MapredWork)task.getWork()).getMapWork().getAliasToWork().values());
} else {
return null;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java Mon Jul 29 15:50:12 2013
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.plan.DD
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -349,7 +350,7 @@ public class MapReduceCompiler {
}
}
- private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
+ private void setInputFormat(MapWork work, Operator<? extends OperatorDesc> op) {
if (op.isUseBucketizedHiveInputFormat()) {
work.setUseBucketizedHiveInputFormat(true);
return;
@@ -365,7 +366,7 @@ public class MapReduceCompiler {
// loop over all the tasks recursively
private void setInputFormat(Task<? extends Serializable> task) {
if (task instanceof ExecDriver) {
- MapredWork work = (MapredWork) task.getWork();
+ MapWork work = ((MapredWork) task.getWork()).getMapWork();
HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
if (!opMap.isEmpty()) {
for (Operator<? extends OperatorDesc> op : opMap.values()) {
@@ -391,16 +392,16 @@ public class MapReduceCompiler {
private void generateCountersTask(Task<? extends Serializable> task) {
if (task instanceof ExecDriver) {
HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
- .getWork()).getAliasToWork();
+ .getWork()).getMapWork().getAliasToWork();
if (!opMap.isEmpty()) {
for (Operator<? extends OperatorDesc> op : opMap.values()) {
generateCountersOperator(op);
}
}
- Operator<? extends OperatorDesc> reducer = ((MapredWork) task.getWork())
- .getReducer();
- if (reducer != null) {
+ if (((MapredWork)task.getWork()).getReduceWork() != null) {
+ Operator<? extends OperatorDesc> reducer = ((MapredWork) task.getWork()).getReduceWork()
+ .getReducer();
LOG.info("Generating counters for operator " + reducer);
generateCountersOperator(reducer);
}
@@ -457,7 +458,7 @@ public class MapReduceCompiler {
if (task instanceof ExecDriver) {
HashMap<String, Operator<? extends OperatorDesc>> opMap = ((MapredWork) task
- .getWork()).getAliasToWork();
+ .getWork()).getMapWork().getAliasToWork();
if (!opMap.isEmpty()) {
for (Operator<? extends OperatorDesc> op : opMap.values()) {
breakOperatorTree(op);
@@ -560,12 +561,12 @@ public class MapReduceCompiler {
* Make a best guess at trying to find the number of reducers
*/
private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) {
- if (mrwork.getReducer() == null) {
+ if (mrwork.getReduceWork() == null) {
return 0;
}
- if (mrwork.getNumReduceTasks() >= 0) {
- return mrwork.getNumReduceTasks();
+ if (mrwork.getReduceWork().getNumReduceTasks() >= 0) {
+ return mrwork.getReduceWork().getNumReduceTasks();
}
return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
@@ -599,7 +600,8 @@ public class MapReduceCompiler {
boolean hasNonLocalJob = false;
for (ExecDriver mrtask : mrtasks) {
try {
- ContentSummary inputSummary = Utilities.getInputSummary(ctx, mrtask.getWork(), p);
+ ContentSummary inputSummary = Utilities.getInputSummary
+ (ctx, ((MapredWork) mrtask.getWork()).getMapWork(), p);
int numReducers = getNumberOfReducers(mrtask.getWork(), conf);
long estimatedInput;
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1508111&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Mon Jul 29 15:50:12 2013
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+
+/**
+ * BaseWork. Base class for any "work" that's being done on the cluster. Items like stats
+ * gathering that are commonly used regarless of the type of work live here.
+ */
+@SuppressWarnings({"serial", "deprecation"})
+public abstract class BaseWork extends AbstractOperatorDesc {
+
+ private boolean gatheringStats;
+
+ public void setGatheringStats(boolean gatherStats) {
+ this.gatheringStats = gatherStats;
+ }
+
+ public boolean isGatheringStats() {
+ return this.gatheringStats;
+ }
+
+ protected abstract List<Operator<?>> getAllRootOperators();
+
+ public List<Operator<?>> getAllOperators() {
+
+ List<Operator<?>> returnList = new ArrayList<Operator<?>>();
+ List<Operator<?>> opList = getAllRootOperators();
+
+ //recursively add all children
+ while (!opList.isEmpty()) {
+ Operator<?> op = opList.remove(0);
+ if (op.getChildOperators() != null) {
+ opList.addAll(op.getChildOperators());
+ }
+ returnList.add(op);
+ }
+
+ return returnList;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java Mon Jul 29 15:50:12 2013
@@ -144,7 +144,12 @@ public class ConditionalResolverMergeFil
if (inpFs.exists(dirPath)) {
// For each dynamic partition, check if it needs to be merged.
- MapredWork work = (MapredWork) mrTask.getWork();
+ MapWork work;
+ if (mrTask.getWork() instanceof MapredWork) {
+ work = ((MapredWork) mrTask.getWork()).getMapWork();
+ } else {
+ work = (MapWork) mrTask.getWork();
+ }
int lbLevel = (ctx.getLbCtx() == null) ? 0 : ctx.getLbCtx().calculateListBucketingLevel();
@@ -222,7 +227,7 @@ public class ConditionalResolverMergeFil
private void generateActualTasks(HiveConf conf, List<Task<? extends Serializable>> resTsks,
long trgtSize, long avgConditionSize, Task<? extends Serializable> mvTask,
Task<? extends Serializable> mrTask, Task<? extends Serializable> mrAndMvTask, Path dirPath,
- FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapredWork work, int dpLbLevel)
+ FileSystem inpFs, ConditionalResolverMergeFilesCtx ctx, MapWork work, int dpLbLevel)
throws IOException {
DynamicPartitionCtx dpCtx = ctx.getDPCtx();
// get list of dynamic partitions
@@ -319,18 +324,11 @@ public class ConditionalResolverMergeFil
return pDesc;
}
- private void setupMapRedWork(HiveConf conf, MapredWork work, long targetSize, long totalSize) {
- if (work.getNumReduceTasks() > 0) {
- int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
- int reducers = (int) ((totalSize + targetSize - 1) / targetSize);
- reducers = Math.max(1, reducers);
- reducers = Math.min(maxReducers, reducers);
- work.setNumReduceTasks(reducers);
- }
- work.setMaxSplitSize(targetSize);
- work.setMinSplitSize(targetSize);
- work.setMinSplitSizePerNode(targetSize);
- work.setMinSplitSizePerRack(targetSize);
+ private void setupMapRedWork(HiveConf conf, MapWork mWork, long targetSize, long totalSize) {
+ mWork.setMaxSplitSize(targetSize);
+ mWork.setMinSplitSize(targetSize);
+ mWork.setMinSplitSizePerNode(targetSize);
+ mWork.setMinSplitSizePerRack(targetSize);
}
private static class AverageSize {
@@ -352,7 +350,6 @@ public class ConditionalResolverMergeFil
}
private AverageSize getAverageSize(FileSystem inpFs, Path dirPath) {
- AverageSize dummy = new AverageSize(0, 0);
AverageSize error = new AverageSize(-1, -1);
try {
FileStatus[] fStats = inpFs.listStatus(dirPath);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/Explain.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/Explain.java?rev=1508111&r1=1508110&r2=1508111&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/Explain.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/Explain.java Mon Jul 29 15:50:12 2013
@@ -32,4 +32,6 @@ public @interface Explain {
boolean normalExplain() default true;
boolean displayOnlyOnTrue() default false;
+
+ boolean skipHeader() default false;
}