You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/21 04:45:05 UTC
svn commit: r1633268 [1/7] - in /hive/branches/spark:
itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/java/org/apache/hadoop/hive/ql/parse/spark/
ql/src/java/org/apache/had...
Author: xuefu
Date: Tue Oct 21 02:45:04 2014
New Revision: 1633268
URL: http://svn.apache.org/r1633268
Log:
HIVE-8436: Modify SparkWork to split works with multiple child works [Spark Branch] (Chao via Xuefu)
Added:
hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q
hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_mixed.q.out
Removed:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java
Modified:
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_cube1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_multi_single_reducer.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_position.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_rollup1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_sort_1_23.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_sort_skew_1_23.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/input12.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/input13.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/input1_limit.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/input_part2.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/insert_into3.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part8.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_gby3.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_lateral_view.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_move_tasks_share_dependencies.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/multigroupby_singlemr.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/ppd_multi_insert.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/ppd_transform.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/subquery_multiinsert.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out
Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Tue Oct 21 02:45:04 2014
@@ -505,6 +505,7 @@ spark.query.files=add_part_multiple.q \
multi_insert_gby2.q \
multi_insert_gby3.q \
multi_insert_lateral_view.q \
+ multi_insert_mixed.q \
multi_insert_move_tasks_share_dependencies.q \
multigroupby_singlemr.q \
optimize_nullscan.q \
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 21 02:45:04 2014
@@ -972,6 +972,23 @@ public final class Utilities {
}
/**
+ * Clones using the powers of XML. Do not use unless necessary.
+ * @param plan The plan.
+ * @return The clone.
+ */
+ public static BaseWork cloneBaseWork(BaseWork plan) {
+ PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+ Configuration conf = new HiveConf();
+ serializePlan(plan, baos, conf, true);
+ BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+ plan.getClass(), conf, true);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+ return newPlan;
+ }
+
+ /**
* Serialize the object. This helper function mainly makes sure that enums,
* counters, etc are handled properly.
*/
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Tue Oct 21 02:45:04 2014
@@ -63,7 +63,7 @@ public abstract class HiveBaseFunctionRe
@Override
public void collect(HiveKey key, BytesWritable value) throws IOException {
- lastRecordOutput.add(copyHiveKey(key), copyBytesWritable(value));
+ lastRecordOutput.add(copyHiveKey(key), SparkUtilities.copyBytesWritable(value));
}
private static HiveKey copyHiveKey(HiveKey key) {
@@ -74,12 +74,6 @@ public abstract class HiveBaseFunctionRe
return copy;
}
- private static BytesWritable copyBytesWritable(BytesWritable bw) {
- BytesWritable copy = new BytesWritable();
- copy.set(bw);
- return copy;
- }
-
/** Process the given record. */
protected abstract void processNextRecord(T inputRecord) throws IOException;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Tue Oct 21 02:45:04 2014
@@ -20,13 +20,9 @@ package org.apache.hadoop.hive.ql.exec.s
import java.util.Iterator;
-import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.spark.TaskContext;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Tue Oct 21 02:45:04 2014
@@ -19,10 +19,13 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
import com.google.common.base.Preconditions;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
public class MapInput implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
@@ -46,7 +49,24 @@ public class MapInput implements SparkTr
JavaPairRDD<BytesWritable, BytesWritable> input) {
Preconditions.checkArgument(input == null,
"AssertionError: MapInput doesn't take any input");
- return toCache ? hadoopRDD.cache() : hadoopRDD;
+ JavaPairRDD result = hadoopRDD;
+ if (toCache) {
+ result = result.mapToPair(new CopyFunction());
+ return result.cache();
+ } else {
+ return result;
+ }
+ }
+
+ private static class CopyFunction implements PairFunction<Tuple2<BytesWritable, BytesWritable>,
+ BytesWritable, BytesWritable> {
+
+ @Override
+ public Tuple2<BytesWritable, BytesWritable> call(Tuple2<BytesWritable, BytesWritable> tup) throws Exception {
+ // no need to copy key since it never get used in HiveMapFunction
+ BytesWritable value = SparkUtilities.copyBytesWritable(tup._2());
+ return new Tuple2<BytesWritable, BytesWritable>(tup._1(), value);
+ }
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Oct 21 02:45:04 2014
@@ -18,15 +18,23 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
@@ -46,7 +54,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -64,6 +71,9 @@ public class SparkPlanGenerator {
private final JobConf jobConf;
private Context context;
private Path scratchDir;
+ private final Map<BaseWork, BaseWork> cloneToWork;
+ private final Map<BaseWork, SparkTran> workToTranMap;
+ private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
public SparkPlanGenerator(JavaSparkContext sc, Context context,
JobConf jobConf, Path scratchDir) {
@@ -71,31 +81,31 @@ public class SparkPlanGenerator {
this.context = context;
this.jobConf = jobConf;
this.scratchDir = scratchDir;
+ this.cloneToWork = new HashMap<BaseWork, BaseWork>();
+ this.workToTranMap = new HashMap<BaseWork, SparkTran>();
+ this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
}
public SparkPlan generate(SparkWork sparkWork) throws Exception {
SparkPlan sparkPlan = new SparkPlan();
- Map<BaseWork, SparkTran> workToTranMap = new HashMap<BaseWork, SparkTran>();
+ cloneToWork.clear();
+ workToTranMap.clear();
+ workToParentWorkTranMap.clear();
+
+ splitSparkWork(sparkWork);
for (BaseWork work : sparkWork.getAllWork()) {
SparkTran tran;
if (work instanceof MapWork) {
- MapInput mapInput = generateMapInput((MapWork)work);
- sparkPlan.addTran(mapInput);
+ SparkTran mapInput = generateParentTran(sparkPlan, sparkWork, work);
tran = generate((MapWork)work);
sparkPlan.addTran(tran);
sparkPlan.connect(mapInput, tran);
} else if (work instanceof ReduceWork) {
- List<BaseWork> parentWorks = sparkWork.getParents(work);
+ SparkTran shuffleTran = generateParentTran(sparkPlan, sparkWork, work);
tran = generate((ReduceWork)work);
sparkPlan.addTran(tran);
- ShuffleTran shuffleTran = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work));
- sparkPlan.addTran(shuffleTran);
sparkPlan.connect(shuffleTran, tran);
- for (BaseWork parentWork : parentWorks) {
- SparkTran parentTran = workToTranMap.get(parentWork);
- sparkPlan.connect(parentTran, shuffleTran);
- }
} else {
List<BaseWork> parentWorks = sparkWork.getParents(work);
tran = new IdentityTran();
@@ -105,12 +115,144 @@ public class SparkPlanGenerator {
sparkPlan.connect(parentTran, tran);
}
}
+
workToTranMap.put(work, tran);
}
return sparkPlan;
}
+ // Generate (possibly get from a cached result) parent SparkTran
+ private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception {
+ if (cloneToWork.containsKey(work)) {
+ BaseWork originalWork = cloneToWork.get(work);
+ if (workToParentWorkTranMap.containsKey(originalWork)) {
+ return workToParentWorkTranMap.get(originalWork);
+ }
+ }
+
+ SparkTran result;
+ if (work instanceof MapWork) {
+ result = generateMapInput((MapWork)work);
+ sparkPlan.addTran(result);
+ } else if (work instanceof ReduceWork) {
+ List<BaseWork> parentWorks = sparkWork.getParents(work);
+ result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
+ sparkPlan.addTran(result);
+ for (BaseWork parentWork : parentWorks) {
+ sparkPlan.connect(workToTranMap.get(parentWork), result);
+ }
+ } else {
+ throw new IllegalStateException("AssertionError: generateParentTran() only expect MapWork or ReduceWork," +
+ " but found " + work.getClass().getName());
+ }
+
+ if (cloneToWork.containsKey(work)) {
+ workToParentWorkTranMap.put(cloneToWork.get(work), result);
+ }
+
+ return result;
+ }
+
+
+ private void splitSparkWork(SparkWork sparkWork) {
+ // do a BFS on the sparkWork graph, and look for any work that has more than one child.
+ // If we found such a work, we split it into multiple ones, one for each of its child.
+ Queue<BaseWork> queue = new LinkedList<BaseWork>();
+ Set<BaseWork> visited = new HashSet<BaseWork>();
+ queue.addAll(sparkWork.getRoots());
+ while (!queue.isEmpty()) {
+ BaseWork work = queue.poll();
+ if (!visited.add(work)) {
+ continue;
+ }
+
+ List<BaseWork> childWorks = sparkWork.getChildren(work);
+ // First, add all children of this work into queue, to be processed later.
+ for (BaseWork w : childWorks) {
+ queue.add(w);
+ }
+
+ // Second, check if this work has multiple reduceSinks. If so, do split.
+ splitBaseWork(sparkWork, work, childWorks);
+ }
+ }
+
+ private Set<Operator<?>> getAllReduceSinks(BaseWork work) {
+ Set<Operator<?>> resultSet = work.getAllLeafOperators();
+ Iterator<Operator<?>> it = resultSet.iterator();
+ while (it.hasNext()) {
+ if (!(it.next() instanceof ReduceSinkOperator)) {
+ it.remove();
+ }
+ }
+ return resultSet;
+ }
+
+ // Split work into multiple branches, one for each childWork in childWorks.
+ // It also set up the connection between each parent work and child work.
+ private void splitBaseWork(SparkWork sparkWork, BaseWork parentWork, List<BaseWork> childWorks) {
+ if (getAllReduceSinks(parentWork).size() <= 1) {
+ return;
+ }
+
+ // Grand-parent works - we need to set these to be the parents of the cloned works.
+ List<BaseWork> grandParentWorks = sparkWork.getParents(parentWork);
+ boolean isFirst = true;
+
+ for (BaseWork childWork : childWorks) {
+ BaseWork clonedParentWork = Utilities.cloneBaseWork(parentWork);
+ String childReducerName = childWork.getName();
+ SparkEdgeProperty clonedEdgeProperty = sparkWork.getEdgeProperty(parentWork, childWork);
+
+ // We need to remove those branches that
+ // 1, ended with a ReduceSinkOperator, and
+ // 2, the ReduceSinkOperator's name is not the same as childReducerName.
+ // Also, if the cloned work is not the first, we remove ALL leaf operators except
+ // the corresponding ReduceSinkOperator.
+ for (Operator<?> op : clonedParentWork.getAllLeafOperators()) {
+ if (op instanceof ReduceSinkOperator) {
+ if (!((ReduceSinkOperator)op).getConf().getOutputName().equals(childReducerName)) {
+ removeOpRecursive(op);
+ }
+ } else if (!isFirst) {
+ removeOpRecursive(op);
+ }
+ }
+
+ isFirst = false;
+
+ // Then, we need to set up the graph connection. Especially:
+ // 1, we need to connect this cloned parent work with all the grand-parent works.
+ // 2, we need to connect this cloned parent work with the corresponding child work.
+ sparkWork.add(clonedParentWork);
+ for (BaseWork gpw : grandParentWorks) {
+ sparkWork.connect(gpw, clonedParentWork, sparkWork.getEdgeProperty(gpw, parentWork));
+ }
+ sparkWork.connect(clonedParentWork, childWork, clonedEdgeProperty);
+ cloneToWork.put(clonedParentWork, parentWork);
+ }
+
+ sparkWork.remove(parentWork);
+ }
+
+ // Remove op from all its parents' child list.
+ // Recursively remove any of its parent who only have this op as child.
+ private void removeOpRecursive(Operator<?> operator) {
+ List<Operator<?>> parentOperators = new ArrayList<Operator<?>>();
+ for (Operator<?> op : operator.getParentOperators()) {
+ parentOperators.add(op);
+ }
+ for (Operator<?> parentOperator : parentOperators) {
+ Preconditions.checkArgument(parentOperator.getChildOperators().contains(operator),
+ "AssertionError: parent of " + operator.getName() + " doesn't have it as child.");
+ parentOperator.removeChild(operator);
+ if (parentOperator.getNumChild() == 0) {
+ removeOpRecursive(parentOperator);
+ }
+ }
+ }
+
private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
// MergeFileWork is sub-class of MapWork, we don't need to distinguish here
if (mWork.getInputformat() != null) {
@@ -147,10 +289,10 @@ public class SparkPlanGenerator {
JavaPairRDD<HiveKey, BytesWritable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
WritableComparable.class, Writable.class);
- return new MapInput(hadoopRDD);
+ return new MapInput(hadoopRDD, false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
}
- private ShuffleTran generate(SparkEdgeProperty edge) {
+ private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) {
Preconditions.checkArgument(!edge.isShuffleNone(),
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
SparkShuffler shuffler;
@@ -161,7 +303,7 @@ public class SparkPlanGenerator {
} else {
shuffler = new GroupByShuffler();
}
- return new ShuffleTran(shuffler, edge.getNumPartitions());
+ return new ShuffleTran(shuffler, edge.getNumPartitions(), needCache);
}
private MapTran generate(MapWork mw) throws Exception {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Oct 21 02:45:04 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.TaskContext;
@@ -33,4 +34,11 @@ public class SparkUtilities {
jobConf.set("mapred.task.id",
String.format("%06d_%02d", taskContext.getPartitionId(), taskContext.getAttemptId()));
}
+
+ public static BytesWritable copyBytesWritable(BytesWritable bw) {
+ BytesWritable copy = new BytesWritable();
+ copy.set(bw);
+ return copy;
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Tue Oct 21 02:45:04 2014
@@ -77,10 +77,6 @@ public class GenSparkProcContext impleme
// walk.
public Operator<? extends OperatorDesc> parentOfRoot;
- // Default task is the task we use for those operators that are not connected
- // to the newly generated TS
- public SparkTask defaultTask;
-
// Spark task we're currently processing
public SparkTask currentTask;
@@ -88,20 +84,6 @@ public class GenSparkProcContext impleme
// one.
public BaseWork preceedingWork;
- // All operators that we should unlink with their parents, for multi-table insertion
- // It's a mapping from operator to its ONLY parent.
- public Map<Operator<?>, Operator<?>> opToParentMap;
-
- // A mapping from operators to their corresponding tasks.
- // The key for this map could only be:
- // 1. TableScanOperators (so we know which task for the tree rooted at this TS)
- // 2. FileSinkOperators (need this info in GenSparkUtils::processFileSinks)
- // 3. UnionOperator/JoinOperator (need for merging tasks)
- public final Map<Operator<?>, SparkTask> opToTaskMap;
-
- // temporary TS generated for multi-table insertion
- public final Set<TableScanOperator> tempTS;
-
// map that keeps track of the last operator of a task to the work
// that follows it. This is used for connecting them later.
public final Map<Operator<?>, BaseWork> leafOperatorToFollowingWork;
@@ -157,10 +139,9 @@ public class GenSparkProcContext impleme
this.rootTasks = rootTasks;
this.inputs = inputs;
this.outputs = outputs;
- this.defaultTask = (SparkTask) TaskFactory.get(
+ this.currentTask = (SparkTask) TaskFactory.get(
new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
- this.rootTasks.add(defaultTask);
- this.currentTask = null;
+ this.rootTasks.add(currentTask);
this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
@@ -178,8 +159,5 @@ public class GenSparkProcContext impleme
this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
- this.opToParentMap = new LinkedHashMap<Operator<?>, Operator<?>>();
- this.opToTaskMap = new LinkedHashMap<Operator<?>, SparkTask>();
- this.tempTS = new LinkedHashSet<TableScanOperator>();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Tue Oct 21 02:45:04 2014
@@ -170,16 +170,6 @@ public class GenSparkUtils {
return mapWork;
}
- // Create a MapWork for a temporary TableScanOperator
- // Basically a thin wrapper on GenMapRedUtils.setTaskPlan.
- public MapWork createMapWork(TableScanOperator root,
- SparkWork sparkWork, String path, TableDesc tt_desc) throws SemanticException {
- MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
- GenMapRedUtils.setTaskPlan(path, path, root, mapWork, false, tt_desc);
- sparkWork.add(mapWork);
- return mapWork;
- }
-
// this method's main use is to help unit testing this class
protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
@@ -274,19 +264,15 @@ public class GenSparkUtils {
throws SemanticException {
ParseContext parseContext = context.parseContext;
- Preconditions.checkArgument(context.opToTaskMap.containsKey(fileSink),
- "AssertionError: the fileSink " + fileSink.getName() + " should be in the context");
-
- SparkTask currentTask = context.opToTaskMap.get(fileSink);
boolean isInsertTable = // is INSERT OVERWRITE TABLE
GenMapRedUtils.isInsertInto(parseContext, fileSink);
HiveConf hconf = parseContext.getConf();
boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
- hconf, fileSink, currentTask, isInsertTable);
+ hconf, fileSink, context.currentTask, isInsertTable);
- Path finalName = GenMapRedUtils.createMoveTask(currentTask,
+ Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
if (chDir) {
@@ -295,13 +281,13 @@ public class GenSparkUtils {
logger.info("using CombineHiveInputformat for the merge job");
GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
context.dependencyTask, context.moveTask,
- hconf, currentTask);
+ hconf, context.currentTask);
}
FetchTask fetchTask = parseContext.getFetchTask();
- if (fetchTask != null && currentTask.getNumChild() == 0) {
+ if (fetchTask != null && context.currentTask.getNumChild() == 0) {
if (fetchTask.isFetchFrom(fileSink.getConf())) {
- currentTask.setFetchSource(true);
+ context.currentTask.setFetchSource(true);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Tue Oct 21 02:45:04 2014
@@ -90,10 +90,6 @@ public class GenSparkWork implements Nod
return null;
}
- if (operator instanceof FileSinkOperator) {
- context.opToTaskMap.put(operator, context.currentTask);
- }
-
SparkWork sparkWork = context.currentTask.getWork();
// Right now the work graph is pretty simple. If there is no
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Tue Oct 21 02:45:04 2014
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -139,39 +138,9 @@ public class SparkCompiler extends TaskC
GenSparkProcContext procCtx = new GenSparkProcContext(
conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
- // -------------------- First Pass ---------------------
-
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"),
- new SparkTableScanProcessor());
-
- Dispatcher disp = new DefaultRuleDispatcher(new SparkMultiInsertionProcessor(), opRules, procCtx);
- ArrayList<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pCtx.getTopOps().values());
- GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
- ogw.startWalking(topNodes, null);
-
- // ------------------- Second Pass ----------------------
-
- // Merge tasks upon Join/Union if possible
- opRules.clear();
- opRules.put(new RuleRegExp("Join", JoinOperator.getOperatorName() + "%"),
- new SparkMergeTaskProcessor());
- opRules.put(new RuleRegExp("Union", UnionOperator.getOperatorName() + "%"),
- new SparkMergeTaskProcessor());
- disp = new DefaultRuleDispatcher(null, opRules, procCtx);
- topNodes = new ArrayList<Node>();
- topNodes.addAll(procCtx.tempTS); // First process temp TS
- topNodes.addAll(pCtx.getTopOps().values());
- ogw = new GenSparkWorkWalker(disp, procCtx);
- ogw.startWalking(topNodes, null);
-
-
- // ------------------- Third Pass -----------------------
-
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher generates the plan from the operator tree
- opRules.clear();
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
opRules.put(new RuleRegExp("Split Work - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
@@ -184,17 +153,7 @@ public class SparkCompiler extends TaskC
opRules.put(new RuleRegExp("Handle Analyze Command",
TableScanOperator.getOperatorName() + "%"),
- new CompositeProcessor(
- new NodeProcessor() {
- @Override
- public Object process(Node nd, Stack<Node> s,
- NodeProcessorCtx procCtx, Object... no) throws SemanticException {
- GenSparkProcContext context = (GenSparkProcContext) procCtx;
- context.currentTask = context.opToTaskMap.get(nd);
- return null;
- }
- },
- new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())));
+ new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()));
opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
new NodeProcessor() {
@@ -213,11 +172,10 @@ public class SparkCompiler extends TaskC
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
- disp = new DefaultRuleDispatcher(null, opRules, procCtx);
- topNodes = new ArrayList<Node>();
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pCtx.getTopOps().values());
- topNodes.addAll(procCtx.tempTS);
- ogw = new GenSparkWorkWalker(disp, procCtx);
+ GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
// we need to clone some operator plans and remove union operators still
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Tue Oct 21 02:45:04 2014
@@ -79,10 +79,6 @@ public class SparkProcessAnalyzeTable im
GenSparkProcContext context = (GenSparkProcContext) procContext;
TableScanOperator tableScan = (TableScanOperator) nd;
- // If this tableScan is a generated one for multi-insertion, ignore it
- if (context.tempTS.contains(tableScan)) {
- return null;
- }
ParseContext parseContext = context.parseContext;
Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Tue Oct 21 02:45:04 2014
@@ -113,6 +113,31 @@ public abstract class BaseWork extends A
return returnSet;
}
+ /**
+ * Returns a set containing all leaf operators from the operator tree in this work.
+ * @return a set containing all leaf operators in this operator tree.
+ */
+ public Set<Operator<?>> getAllLeafOperators() {
+ Set<Operator<?>> returnSet = new LinkedHashSet<Operator<?>>();
+ Set<Operator<?>> opSet = getAllRootOperators();
+ Stack<Operator<?>> opStack = new Stack<Operator<?>>();
+
+ // add all children
+ opStack.addAll(opSet);
+
+ while(!opStack.empty()) {
+ Operator<?> op = opStack.pop();
+ if (op.getNumChild() == 0) {
+ returnSet.add(op);
+ }
+ if (op.getChildOperators() != null) {
+ opStack.addAll(op.getChildOperators());
+ }
+ }
+
+ return returnSet;
+ }
+
public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
return scratchColumnVectorTypes;
}
Added: hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q?rev=1633268&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q Tue Oct 21 02:45:04 2014
@@ -0,0 +1,21 @@
+create table src_multi1 like src;
+create table src_multi2 like src;
+create table src_multi3 like src;
+
+-- Testing the case where a map work contains both shuffling (ReduceSinkOperator)
+-- and inserting to output table (FileSinkOperator).
+
+explain
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10;
+
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10;
+
+select * from src_multi1;
+select * from src_multi2;
+select * from src_multi3;
Added: hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out?rev=1633268&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out Tue Oct 21 02:45:04 2014
@@ -0,0 +1,915 @@
+PREHOOK: query: create table src_multi1 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src_multi1
+POSTHOOK: query: create table src_multi1 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src_multi1
+PREHOOK: query: create table src_multi2 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src_multi2
+POSTHOOK: query: create table src_multi2 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src_multi2
+PREHOOK: query: create table src_multi3 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src_multi3
+POSTHOOK: query: create table src_multi3 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src_multi3
+PREHOOK: query: -- Testing the case where a map work contains both shuffling (ReduceSinkOperator)
+-- and inserting to output table (FileSinkOperator).
+
+explain
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Testing the case where a map work contains both shuffling (ReduceSinkOperator)
+-- and inserting to output table (FileSinkOperator).
+
+explain
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-3 is a root stage
+ Stage-4 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-4
+ Stage-5 depends on stages: Stage-0
+ Stage-6 depends on stages: Stage-3
+ Stage-7 depends on stages: Stage-6
+ Stage-1 depends on stages: Stage-7
+ Stage-8 depends on stages: Stage-1
+ Stage-2 depends on stages: Stage-7
+ Stage-9 depends on stages: Stage-2
+
+STAGE PLANS:
+ Stage: Stage-3
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: key
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count(1)
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
+ Select Operator
+ expressions: value (type: string)
+ outputColumnNames: value
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count(1)
+ keys: value (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Filter Operator
+ predicate: (key < 10) (type: boolean)
+ Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.src_multi3
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-4
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.src_multi1
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.src_multi1
+
+ Stage: Stage-5
+ Stats-Aggr Operator
+
+ Stage: Stage-6
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-7
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: bigint)
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.src_multi2
+
+ Stage: Stage-1
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.src_multi2
+
+ Stage: Stage-8
+ Stats-Aggr Operator
+
+ Stage: Stage-2
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.src_multi3
+
+ Stage: Stage-9
+ Stats-Aggr Operator
+
+PREHOOK: query: from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src_multi1
+PREHOOK: Output: default@src_multi2
+PREHOOK: Output: default@src_multi3
+POSTHOOK: query: from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@src_multi1
+POSTHOOK: Output: default@src_multi2
+POSTHOOK: Output: default@src_multi3
+POSTHOOK: Lineage: src_multi1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src_multi1.value EXPRESSION [(src)src.null, ]
+POSTHOOK: Lineage: src_multi2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: src_multi2.value EXPRESSION [(src)src.null, ]
+POSTHOOK: Lineage: src_multi3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src_multi3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from src_multi1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_multi1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from src_multi1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_multi1
+#### A masked pattern was here ####
+0 3
+10 1
+100 2
+103 2
+104 2
+105 1
+11 1
+111 1
+113 2
+114 1
+116 1
+118 2
+119 3
+12 2
+120 2
+125 2
+126 1
+128 3
+129 2
+131 1
+133 1
+134 2
+136 1
+137 2
+138 4
+143 1
+145 1
+146 2
+149 2
+15 2
+150 1
+152 2
+153 1
+155 1
+156 1
+157 1
+158 1
+160 1
+162 1
+163 1
+164 2
+165 2
+166 1
+167 3
+168 1
+169 4
+17 1
+170 1
+172 2
+174 2
+175 2
+176 2
+177 1
+178 1
+179 2
+18 2
+180 1
+181 1
+183 1
+186 1
+187 3
+189 1
+19 1
+190 1
+191 2
+192 1
+193 3
+194 1
+195 2
+196 1
+197 2
+199 3
+2 1
+20 1
+200 2
+201 1
+202 1
+203 2
+205 2
+207 2
+208 3
+209 2
+213 2
+214 1
+216 2
+217 2
+218 1
+219 2
+221 2
+222 1
+223 2
+224 2
+226 1
+228 1
+229 2
+230 5
+233 2
+235 1
+237 2
+238 2
+239 2
+24 2
+241 1
+242 2
+244 1
+247 1
+248 1
+249 1
+252 1
+255 2
+256 2
+257 1
+258 1
+26 2
+260 1
+262 1
+263 1
+265 2
+266 1
+27 1
+272 2
+273 3
+274 1
+275 1
+277 4
+278 2
+28 1
+280 2
+281 2
+282 2
+283 1
+284 1
+285 1
+286 1
+287 1
+288 2
+289 1
+291 1
+292 1
+296 1
+298 3
+30 1
+302 1
+305 1
+306 1
+307 2
+308 1
+309 2
+310 1
+311 3
+315 1
+316 3
+317 2
+318 3
+321 2
+322 2
+323 1
+325 2
+327 3
+33 1
+331 2
+332 1
+333 2
+335 1
+336 1
+338 1
+339 1
+34 1
+341 1
+342 2
+344 2
+345 1
+348 5
+35 3
+351 1
+353 2
+356 1
+360 1
+362 1
+364 1
+365 1
+366 1
+367 2
+368 1
+369 3
+37 2
+373 1
+374 1
+375 1
+377 1
+378 1
+379 1
+382 2
+384 3
+386 1
+389 1
+392 1
+393 1
+394 1
+395 2
+396 3
+397 2
+399 2
+4 1
+400 1
+401 5
+402 1
+403 3
+404 2
+406 4
+407 1
+409 3
+41 1
+411 1
+413 2
+414 2
+417 3
+418 1
+419 1
+42 2
+421 1
+424 2
+427 1
+429 2
+43 1
+430 3
+431 3
+432 1
+435 1
+436 1
+437 1
+438 3
+439 2
+44 1
+443 1
+444 1
+446 1
+448 1
+449 1
+452 1
+453 1
+454 3
+455 1
+457 1
+458 2
+459 2
+460 1
+462 2
+463 2
+466 3
+467 1
+468 4
+469 5
+47 1
+470 1
+472 1
+475 1
+477 1
+478 2
+479 1
+480 3
+481 1
+482 1
+483 1
+484 1
+485 1
+487 1
+489 4
+490 1
+491 1
+492 2
+493 1
+494 1
+495 1
+496 1
+497 1
+498 3
+5 3
+51 2
+53 1
+54 1
+57 1
+58 2
+64 1
+65 1
+66 1
+67 2
+69 1
+70 3
+72 2
+74 1
+76 2
+77 1
+78 1
+8 1
+80 1
+82 1
+83 2
+84 2
+85 1
+86 1
+87 1
+9 1
+90 3
+92 1
+95 2
+96 1
+97 2
+98 2
+PREHOOK: query: select * from src_multi2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_multi2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from src_multi2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_multi2
+#### A masked pattern was here ####
+val_0 3
+val_10 1
+val_100 2
+val_103 2
+val_104 2
+val_105 1
+val_11 1
+val_111 1
+val_113 2
+val_114 1
+val_116 1
+val_118 2
+val_119 3
+val_12 2
+val_120 2
+val_125 2
+val_126 1
+val_128 3
+val_129 2
+val_131 1
+val_133 1
+val_134 2
+val_136 1
+val_137 2
+val_138 4
+val_143 1
+val_145 1
+val_146 2
+val_149 2
+val_15 2
+val_150 1
+val_152 2
+val_153 1
+val_155 1
+val_156 1
+val_157 1
+val_158 1
+val_160 1
+val_162 1
+val_163 1
+val_164 2
+val_165 2
+val_166 1
+val_167 3
+val_168 1
+val_169 4
+val_17 1
+val_170 1
+val_172 2
+val_174 2
+val_175 2
+val_176 2
+val_177 1
+val_178 1
+val_179 2
+val_18 2
+val_180 1
+val_181 1
+val_183 1
+val_186 1
+val_187 3
+val_189 1
+val_19 1
+val_190 1
+val_191 2
+val_192 1
+val_193 3
+val_194 1
+val_195 2
+val_196 1
+val_197 2
+val_199 3
+val_2 1
+val_20 1
+val_200 2
+val_201 1
+val_202 1
+val_203 2
+val_205 2
+val_207 2
+val_208 3
+val_209 2
+val_213 2
+val_214 1
+val_216 2
+val_217 2
+val_218 1
+val_219 2
+val_221 2
+val_222 1
+val_223 2
+val_224 2
+val_226 1
+val_228 1
+val_229 2
+val_230 5
+val_233 2
+val_235 1
+val_237 2
+val_238 2
+val_239 2
+val_24 2
+val_241 1
+val_242 2
+val_244 1
+val_247 1
+val_248 1
+val_249 1
+val_252 1
+val_255 2
+val_256 2
+val_257 1
+val_258 1
+val_26 2
+val_260 1
+val_262 1
+val_263 1
+val_265 2
+val_266 1
+val_27 1
+val_272 2
+val_273 3
+val_274 1
+val_275 1
+val_277 4
+val_278 2
+val_28 1
+val_280 2
+val_281 2
+val_282 2
+val_283 1
+val_284 1
+val_285 1
+val_286 1
+val_287 1
+val_288 2
+val_289 1
+val_291 1
+val_292 1
+val_296 1
+val_298 3
+val_30 1
+val_302 1
+val_305 1
+val_306 1
+val_307 2
+val_308 1
+val_309 2
+val_310 1
+val_311 3
+val_315 1
+val_316 3
+val_317 2
+val_318 3
+val_321 2
+val_322 2
+val_323 1
+val_325 2
+val_327 3
+val_33 1
+val_331 2
+val_332 1
+val_333 2
+val_335 1
+val_336 1
+val_338 1
+val_339 1
+val_34 1
+val_341 1
+val_342 2
+val_344 2
+val_345 1
+val_348 5
+val_35 3
+val_351 1
+val_353 2
+val_356 1
+val_360 1
+val_362 1
+val_364 1
+val_365 1
+val_366 1
+val_367 2
+val_368 1
+val_369 3
+val_37 2
+val_373 1
+val_374 1
+val_375 1
+val_377 1
+val_378 1
+val_379 1
+val_382 2
+val_384 3
+val_386 1
+val_389 1
+val_392 1
+val_393 1
+val_394 1
+val_395 2
+val_396 3
+val_397 2
+val_399 2
+val_4 1
+val_400 1
+val_401 5
+val_402 1
+val_403 3
+val_404 2
+val_406 4
+val_407 1
+val_409 3
+val_41 1
+val_411 1
+val_413 2
+val_414 2
+val_417 3
+val_418 1
+val_419 1
+val_42 2
+val_421 1
+val_424 2
+val_427 1
+val_429 2
+val_43 1
+val_430 3
+val_431 3
+val_432 1
+val_435 1
+val_436 1
+val_437 1
+val_438 3
+val_439 2
+val_44 1
+val_443 1
+val_444 1
+val_446 1
+val_448 1
+val_449 1
+val_452 1
+val_453 1
+val_454 3
+val_455 1
+val_457 1
+val_458 2
+val_459 2
+val_460 1
+val_462 2
+val_463 2
+val_466 3
+val_467 1
+val_468 4
+val_469 5
+val_47 1
+val_470 1
+val_472 1
+val_475 1
+val_477 1
+val_478 2
+val_479 1
+val_480 3
+val_481 1
+val_482 1
+val_483 1
+val_484 1
+val_485 1
+val_487 1
+val_489 4
+val_490 1
+val_491 1
+val_492 2
+val_493 1
+val_494 1
+val_495 1
+val_496 1
+val_497 1
+val_498 3
+val_5 3
+val_51 2
+val_53 1
+val_54 1
+val_57 1
+val_58 2
+val_64 1
+val_65 1
+val_66 1
+val_67 2
+val_69 1
+val_70 3
+val_72 2
+val_74 1
+val_76 2
+val_77 1
+val_78 1
+val_8 1
+val_80 1
+val_82 1
+val_83 2
+val_84 2
+val_85 1
+val_86 1
+val_87 1
+val_9 1
+val_90 3
+val_92 1
+val_95 2
+val_96 1
+val_97 2
+val_98 2
+PREHOOK: query: select * from src_multi3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_multi3
+#### A masked pattern was here ####
+POSTHOOK: query: select * from src_multi3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_multi3
+#### A masked pattern was here ####
+0 val_0
+4 val_4
+8 val_8
+0 val_0
+0 val_0
+5 val_5
+5 val_5
+2 val_2
+5 val_5
+9 val_9
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out Tue Oct 21 02:45:04 2014
@@ -30,40 +30,25 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-4 depends on stages: Stage-2
- Stage-3 depends on stages: Stage-4, Stage-5
+ Stage-3 depends on stages: Stage-2
Stage-0 depends on stages: Stage-3
- Stage-6 depends on stages: Stage-0
+ Stage-4 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-7 depends on stages: Stage-1
- Stage-5 depends on stages: Stage-2
+ Stage-5 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-2
Spark
-#### A masked pattern was here ####
- Vertices:
- Map 3
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: true
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-
- Stage: Stage-4
- Spark
Edges:
- Reducer 4 <- Map 1 (GROUP, 31)
+ Reducer 2 <- Map 1 (GROUP, 31)
+ Reducer 3 <- Map 1 (GROUP, 31)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: key, value
@@ -80,7 +65,23 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
- Reducer 4
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: key, value
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(substr(value, 5))
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: double)
+ Reducer 2
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
@@ -100,6 +101,26 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
+ Reducer 3
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: true
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest2
Stage: Stage-3
Dependency Collection
@@ -114,7 +135,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Stage: Stage-6
+ Stage: Stage-4
Stats-Aggr Operator
Stage: Stage-1
@@ -127,54 +148,8 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest2
- Stage: Stage-7
- Stats-Aggr Operator
-
Stage: Stage-5
- Spark
- Edges:
- Reducer 5 <- Map 2 (GROUP, 31)
-#### A masked pattern was here ####
- Vertices:
- Map 2
- Map Operator Tree:
- TableScan
- Select Operator
- expressions: key (type: string), value (type: string)
- outputColumnNames: key, value
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- aggregations: sum(substr(value, 5))
- keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col1 (type: double)
- Reducer 5
- Reduce Operator Tree:
- Group By Operator
- aggregations: sum(VALUE._col0)
- keys: KEY._col0 (type: string)
- mode: mergepartial
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: true
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
+ Stats-Aggr Operator
PREHOOK: query: FROM SRC
INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out Tue Oct 21 02:45:04 2014
@@ -30,34 +30,18 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-4 depends on stages: Stage-2
- Stage-3 depends on stages: Stage-4, Stage-5
+ Stage-3 depends on stages: Stage-2
Stage-0 depends on stages: Stage-3
- Stage-6 depends on stages: Stage-0
+ Stage-4 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-7 depends on stages: Stage-1
- Stage-5 depends on stages: Stage-2
+ Stage-5 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-2
Spark
-#### A masked pattern was here ####
- Vertices:
- Map 3
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: true
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-
- Stage: Stage-4
- Spark
Edges:
+ Reducer 2 <- Map 1 (GROUP SORT, 31)
+ Reducer 3 <- Reducer 2 (GROUP, 31)
Reducer 4 <- Map 1 (GROUP SORT, 31)
Reducer 5 <- Reducer 4 (GROUP, 31)
#### A masked pattern was here ####
@@ -65,6 +49,8 @@ STAGE PLANS:
Map 1
Map Operator Tree:
TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: key, value
@@ -81,7 +67,23 @@ STAGE PLANS:
Map-reduce partition columns: rand() (type: double)
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
- Reducer 4
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: key, value
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: sum(substr(value, 5))
+ keys: key (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: rand() (type: double)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: double)
+ Reducer 2
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
@@ -95,7 +97,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
- Reducer 5
+ Reducer 3
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
@@ -115,63 +117,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
-
- Stage: Stage-3
- Dependency Collection
-
- Stage: Stage-0
- Move Operator
- tables:
- replace: true
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest1
-
- Stage: Stage-6
- Stats-Aggr Operator
-
- Stage: Stage-1
- Move Operator
- tables:
- replace: true
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
-
- Stage: Stage-7
- Stats-Aggr Operator
-
- Stage: Stage-5
- Spark
- Edges:
- Reducer 6 <- Map 2 (GROUP SORT, 31)
- Reducer 7 <- Reducer 6 (GROUP, 31)
-#### A masked pattern was here ####
- Vertices:
- Map 2
- Map Operator Tree:
- TableScan
- Select Operator
- expressions: key (type: string), value (type: string)
- outputColumnNames: key, value
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- aggregations: sum(substr(value, 5))
- keys: key (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: rand() (type: double)
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col1 (type: double)
- Reducer 6
+ Reducer 4
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
@@ -185,7 +131,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: double)
- Reducer 7
+ Reducer 5
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
@@ -206,6 +152,35 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest2
+ Stage: Stage-3
+ Dependency Collection
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest1
+
+ Stage: Stage-4
+ Stats-Aggr Operator
+
+ Stage: Stage-1
+ Move Operator
+ tables:
+ replace: true
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest2
+
+ Stage: Stage-5
+ Stats-Aggr Operator
+
PREHOOK: query: FROM SRC
INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out Tue Oct 21 02:45:04 2014
@@ -30,40 +30,25 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-4 depends on stages: Stage-2
- Stage-3 depends on stages: Stage-4, Stage-5
+ Stage-3 depends on stages: Stage-2
Stage-0 depends on stages: Stage-3
- Stage-6 depends on stages: Stage-0
+ Stage-4 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-7 depends on stages: Stage-1
- Stage-5 depends on stages: Stage-2
+ Stage-5 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-2
Spark
-#### A masked pattern was here ####
- Vertices:
- Map 3
- Map Operator Tree:
- TableScan
- alias: src
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: true
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-
- Stage: Stage-4
- Spark
Edges:
- Reducer 4 <- Map 1 (GROUP, 31)
+ Reducer 2 <- Map 1 (GROUP, 31)
+ Reducer 3 <- Map 1 (GROUP, 31)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: key, value
@@ -74,7 +59,17 @@ STAGE PLANS:
Map-reduce partition columns: key (type: string)
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
value expressions: substr(value, 5) (type: string)
- Reducer 4
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: key, value
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ value expressions: substr(value, 5) (type: string)
+ Reducer 2
Reduce Operator Tree:
Group By Operator
aggregations: sum(VALUE._col0)
@@ -94,6 +89,26 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
+ Reducer 3
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: sum(VALUE._col0)
+ keys: KEY._col0 (type: string)
+ mode: complete
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: true
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest2
Stage: Stage-3
Dependency Collection
@@ -108,7 +123,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Stage: Stage-6
+ Stage: Stage-4
Stats-Aggr Operator
Stage: Stage-1
@@ -121,48 +136,8 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest2
- Stage: Stage-7
- Stats-Aggr Operator
-
Stage: Stage-5
- Spark
- Edges:
- Reducer 5 <- Map 2 (GROUP, 31)
-#### A masked pattern was here ####
- Vertices:
- Map 2
- Map Operator Tree:
- TableScan
- Select Operator
- expressions: key (type: string), value (type: string)
- outputColumnNames: key, value
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: key (type: string)
- sort order: +
- Map-reduce partition columns: key (type: string)
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- value expressions: substr(value, 5) (type: string)
- Reducer 5
- Reduce Operator Tree:
- Group By Operator
- aggregations: sum(VALUE._col0)
- keys: KEY._col0 (type: string)
- mode: complete
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: true
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
+ Stats-Aggr Operator
PREHOOK: query: FROM SRC
INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key