You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/21 19:18:53 UTC
svn commit: r1633418 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
HivePairFlatMapFunction.java SparkClient.java SparkUtilities.java
Author: xuefu
Date: Tue Oct 21 17:18:53 2014
New Revision: 1633418
URL: http://svn.apache.org/r1633418
Log:
HIVE-8537: Update to use the stable TaskContext API [Spark Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java?rev=1633418&r1=1633417&r2=1633418&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java Tue Oct 21 17:18:53 2014
@@ -8,9 +8,8 @@ import org.apache.spark.api.java.functio
public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
-
- protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
- protected static final NumberFormat stageIdFormat = NumberFormat.getInstance();
+ private static final NumberFormat taskIdFormat = NumberFormat.getInstance();
+ private static final NumberFormat stageIdFormat = NumberFormat.getInstance();
static {
taskIdFormat.setGroupingUsed(false);
taskIdFormat.setMinimumIntegerDigits(6);
@@ -43,7 +42,7 @@ public abstract class HivePairFlatMapFun
StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
taskAttemptIdBuilder.append(System.currentTimeMillis())
.append("_")
- .append(stageIdFormat.format(TaskContext.get().getStageId()))
+ .append(stageIdFormat.format(TaskContext.get().stageId()))
.append("_");
if (isMap()) {
@@ -54,12 +53,12 @@ public abstract class HivePairFlatMapFun
// Spark task attempt id is increased by Spark context instead of task, which may introduce
// unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
- taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().getPartitionId()))
+ taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId()))
.append("_0");
String taskAttemptIdStr = taskAttemptIdBuilder.toString();
jobConf.set("mapred.task.id", taskAttemptIdStr);
jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr);
- jobConf.setInt("mapred.task.partition", TaskContext.get().getPartitionId());
+ jobConf.setInt("mapred.task.partition", TaskContext.get().partitionId());
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1633418&r1=1633417&r2=1633418&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Tue Oct 21 17:18:53 2014
@@ -41,9 +41,8 @@ import org.apache.hadoop.hive.ql.plan.Sp
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.FutureAction;
-import org.apache.spark.SimpleFutureAction;
import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ui.jobs.JobProgressListener;
@@ -187,11 +186,14 @@ public class SparkClient implements Seri
try {
JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
// We use Spark RDD async action to submit job as it's the only way to get jobId now.
- FutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+ JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
// An action may trigger multi jobs in Spark, we only monitor the latest job here
// until we found that Hive does trigger multi jobs.
+ List<Integer> jobIds = future.jobIds();
+ // jobIds size is always bigger than or equal with 1.
+ int jobId = jobIds.get(jobIds.size() - 1);
SimpleSparkJobStatus sparkJobStatus = new SimpleSparkJobStatus(
- (Integer) future.jobIds().last(), jobStateListener, jobProgressListener);
+ jobId, jobStateListener, jobProgressListener);
SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
monitor.startMonitor();
} catch (Exception e) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1633418&r1=1633417&r2=1633418&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Oct 21 17:18:53 2014
@@ -25,15 +25,6 @@ import org.apache.spark.TaskContext;
* Contains utilities methods used as part of Spark tasks
*/
public class SparkUtilities {
- public static void setTaskInfoInJobConf(JobConf jobConf, TaskContext taskContext) {
- // Set mapred.task.partition in executor side.
- jobConf.setInt("mapred.task.partition", taskContext.getPartitionId());
-
- // Set mapred.task.id as taskId_attemptId. The taskId is 6 digits in length (prefixed with 0 if
- // necessary). Similarly attemptId is two digits in length.
- jobConf.set("mapred.task.id",
- String.format("%06d_%02d", taskContext.getPartitionId(), taskContext.getAttemptId()));
- }
public static BytesWritable copyBytesWritable(BytesWritable bw) {
BytesWritable copy = new BytesWritable();