You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/10 20:43:09 UTC
svn commit: r1630954 - in /hive/branches/spark: itests/src/test/resources/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/test/results/clientpositive/spark/
Author: xuefu
Date: Fri Oct 10 18:43:08 2014
New Revision: 1630954
URL: http://svn.apache.org/r1630954
Log:
HIVE-7776: enable sample10.q.[Spark Branch] (Chengxiang via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out
Modified:
hive/branches/spark/itests/src/test/resources/testconfiguration.properties
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1630954&r1=1630953&r2=1630954&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Fri Oct 10 18:43:08 2014
@@ -531,6 +531,7 @@ spark.query.files=add_part_multiple.q \
sample7.q \
sample8.q \
sample9.q \
+ sample10.q \
script_env_var1.q \
script_env_var2.q \
script_pipe.q \
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1630954&r1=1630953&r2=1630954&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Fri Oct 10 18:43:08 2014
@@ -31,25 +31,19 @@ import org.apache.spark.api.java.functio
import scala.Tuple2;
-public class HiveMapFunction implements PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>,
- HiveKey, BytesWritable> {
- private static final long serialVersionUID = 1L;
-
- private transient JobConf jobConf;
+public class HiveMapFunction extends HivePairFlatMapFunction<
+ Iterator<Tuple2<BytesWritable, BytesWritable>>, HiveKey, BytesWritable> {
- private byte[] buffer;
+ private static final long serialVersionUID = 1L;
public HiveMapFunction(byte[] buffer) {
- this.buffer = buffer;
+ super(buffer);
}
@Override
public Iterable<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
- if (jobConf == null) {
- jobConf = KryoSerializer.deserializeJobConf(this.buffer);
- SparkUtilities.setTaskInfoInJobConf(jobConf, TaskContext.get());
- }
+ initJobConf();
SparkRecordHandler mapRecordHandler;
@@ -67,4 +61,9 @@ public class HiveMapFunction implements
return result;
}
+ @Override
+ protected boolean isMap() {
+ return true;
+ }
+
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java?rev=1630954&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java Fri Oct 10 18:43:08 2014
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.text.NumberFormat;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+
+public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
+
+ protected transient JobConf jobConf;
+
+ private byte[] buffer;
+
+ protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
+ protected static final NumberFormat stageIdFormat = NumberFormat.getInstance();
+
+ static {
+ taskIdFormat.setGroupingUsed(false);
+ taskIdFormat.setMinimumIntegerDigits(6);
+ stageIdFormat.setGroupingUsed(false);
+ stageIdFormat.setMinimumIntegerDigits(4);
+ }
+
+ public HivePairFlatMapFunction(byte[] buffer) {
+ this.buffer = buffer;
+ }
+
+ protected void initJobConf() {
+ if (jobConf == null) {
+ jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+ setupMRLegacyConfigs();
+ }
+ }
+
+ protected abstract boolean isMap();
+
+ // Some Hive features depends on several MR configuration legacy, build and add
+ // these configuration to JobConf here.
+ private void setupMRLegacyConfigs() {
+ StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
+ taskAttemptIdBuilder.append(System.currentTimeMillis())
+ .append("_")
+ .append(stageIdFormat.format(TaskContext.get().getStageId()))
+ .append("_");
+
+ if (isMap()) {
+ taskAttemptIdBuilder.append("m_");
+ } else {
+ taskAttemptIdBuilder.append("r_");
+ }
+
+ // Spark task attempt id is increased by Spark context instead of task, which may introduce
+ // unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
+ taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().getPartitionId()))
+ .append("_0");
+
+ String taskAttemptIdStr = taskAttemptIdBuilder.toString();
+ jobConf.set("mapred.task.id", taskAttemptIdStr);
+ jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr);
+ jobConf.setInt("mapred.task.partition", TaskContext.get().getPartitionId());
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1630954&r1=1630953&r2=1630954&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Fri Oct 10 18:43:08 2014
@@ -30,25 +30,19 @@ import org.apache.spark.api.java.functio
import scala.Tuple2;
-public class HiveReduceFunction implements PairFlatMapFunction<
- Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable> {
- private static final long serialVersionUID = 1L;
-
- private transient JobConf jobConf;
+public class HiveReduceFunction extends HivePairFlatMapFunction<
+ Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable> {
- private byte[] buffer;
+ private static final long serialVersionUID = 1L;
public HiveReduceFunction(byte[] buffer) {
- this.buffer = buffer;
+ super(buffer);
}
@Override
public Iterable<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
- if (jobConf == null) {
- jobConf = KryoSerializer.deserializeJobConf(this.buffer);
- SparkUtilities.setTaskInfoInJobConf(jobConf, TaskContext.get());
- }
+ initJobConf();
SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
HiveReduceFunctionResultList result =
@@ -57,4 +51,9 @@ public class HiveReduceFunction implemen
return result;
}
+
+ @Override
+ protected boolean isMap() {
+ return false;
+ }
}
Added: hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out?rev=1630954&view=auto
==============================================================================
Files hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out (added) and hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out Fri Oct 10 18:43:08 2014 differ