You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/21 19:18:53 UTC

svn commit: r1633418 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: HivePairFlatMapFunction.java SparkClient.java SparkUtilities.java

Author: xuefu
Date: Tue Oct 21 17:18:53 2014
New Revision: 1633418

URL: http://svn.apache.org/r1633418
Log:
HIVE-8537: Update to use the stable TaskContext API [Spark Branch] (Chengxiang via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java?rev=1633418&r1=1633417&r2=1633418&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java Tue Oct 21 17:18:53 2014
@@ -8,9 +8,8 @@ import org.apache.spark.api.java.functio
 
 
 public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
-
-  protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
-  protected static final NumberFormat stageIdFormat = NumberFormat.getInstance();
+  private static final NumberFormat taskIdFormat = NumberFormat.getInstance();
+  private static final NumberFormat stageIdFormat = NumberFormat.getInstance();
   static {
     taskIdFormat.setGroupingUsed(false);
     taskIdFormat.setMinimumIntegerDigits(6);
@@ -43,7 +42,7 @@ public abstract class HivePairFlatMapFun
     StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
     taskAttemptIdBuilder.append(System.currentTimeMillis())
       .append("_")
-      .append(stageIdFormat.format(TaskContext.get().getStageId()))
+      .append(stageIdFormat.format(TaskContext.get().stageId()))
       .append("_");
 
     if (isMap()) {
@@ -54,12 +53,12 @@ public abstract class HivePairFlatMapFun
 
     // Spark task attempt id is increased by Spark context instead of task, which may introduce
     // unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
-    taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().getPartitionId()))
+    taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId()))
       .append("_0");
 
     String taskAttemptIdStr = taskAttemptIdBuilder.toString();
     jobConf.set("mapred.task.id", taskAttemptIdStr);
     jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr);
-    jobConf.setInt("mapred.task.partition", TaskContext.get().getPartitionId());
+    jobConf.setInt("mapred.task.partition", TaskContext.get().partitionId());
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1633418&r1=1633417&r2=1633418&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Tue Oct 21 17:18:53 2014
@@ -41,9 +41,8 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.FutureAction;
-import org.apache.spark.SimpleFutureAction;
 import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.ui.jobs.JobProgressListener;
@@ -187,11 +186,14 @@ public class SparkClient implements Seri
     try {
       JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
       // We use Spark RDD async action to submit job as it's the only way to get jobId now.
-      FutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
+      JavaFutureAction<Void> future = finalRDD.foreachAsync(HiveVoidFunction.getInstance());
       // An action may trigger multi jobs in Spark, we only monitor the latest job here
       // until we found that Hive does trigger multi jobs.
+      List<Integer> jobIds = future.jobIds();
+      // jobIds size is always bigger than or equal with 1.
+      int jobId = jobIds.get(jobIds.size() - 1);
       SimpleSparkJobStatus sparkJobStatus = new SimpleSparkJobStatus(
-        (Integer) future.jobIds().last(), jobStateListener, jobProgressListener);
+       jobId, jobStateListener, jobProgressListener);
       SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus);
       monitor.startMonitor();
     } catch (Exception e) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1633418&r1=1633417&r2=1633418&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Oct 21 17:18:53 2014
@@ -25,15 +25,6 @@ import org.apache.spark.TaskContext;
  * Contains utilities methods used as part of Spark tasks
  */
 public class SparkUtilities {
-  public static void setTaskInfoInJobConf(JobConf jobConf, TaskContext taskContext) {
-    // Set mapred.task.partition in executor side.
-    jobConf.setInt("mapred.task.partition", taskContext.getPartitionId());
-
-    // Set mapred.task.id as taskId_attemptId. The taskId is 6 digits in length (prefixed with 0 if
-    // necessary). Similarly attemptId is two digits in length.
-    jobConf.set("mapred.task.id",
-        String.format("%06d_%02d", taskContext.getPartitionId(), taskContext.getAttemptId()));
-  }
 
   public static BytesWritable copyBytesWritable(BytesWritable bw) {
     BytesWritable copy = new BytesWritable();