You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/10 20:43:09 UTC

svn commit: r1630954 - in /hive/branches/spark: itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/test/results/clientpositive/spark/

Author: xuefu
Date: Fri Oct 10 18:43:08 2014
New Revision: 1630954

URL: http://svn.apache.org/r1630954
Log:
HIVE-7776: enable sample10.q.[Spark Branch] (Chengxiang via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out
Modified:
    hive/branches/spark/itests/src/test/resources/testconfiguration.properties
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java

Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1630954&r1=1630953&r2=1630954&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Fri Oct 10 18:43:08 2014
@@ -531,6 +531,7 @@ spark.query.files=add_part_multiple.q \
   sample7.q \
   sample8.q \
   sample9.q \
+  sample10.q \
   script_env_var1.q \
   script_env_var2.q \
   script_pipe.q \

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1630954&r1=1630953&r2=1630954&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Fri Oct 10 18:43:08 2014
@@ -31,25 +31,19 @@ import org.apache.spark.api.java.functio
 
 import scala.Tuple2;
 
-public class HiveMapFunction implements PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>,
-    HiveKey, BytesWritable> {
-  private static final long serialVersionUID = 1L;
-
-  private transient JobConf jobConf;
+public class HiveMapFunction extends HivePairFlatMapFunction<
+  Iterator<Tuple2<BytesWritable, BytesWritable>>, HiveKey, BytesWritable> {
 
-  private byte[] buffer;
+  private static final long serialVersionUID = 1L;
 
   public HiveMapFunction(byte[] buffer) {
-    this.buffer = buffer;
+    super(buffer);
   }
 
   @Override
   public Iterable<Tuple2<HiveKey, BytesWritable>>
   call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
-    if (jobConf == null) {
-      jobConf = KryoSerializer.deserializeJobConf(this.buffer);
-      SparkUtilities.setTaskInfoInJobConf(jobConf, TaskContext.get());
-    }
+    initJobConf();
 
     SparkRecordHandler mapRecordHandler;
 
@@ -67,4 +61,9 @@ public class HiveMapFunction implements 
     return result;
   }
 
+  @Override
+  protected boolean isMap() {
+    return true;
+  }
+
 }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java?rev=1630954&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java Fri Oct 10 18:43:08 2014
@@ -0,0 +1,64 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.text.NumberFormat;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+
+public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
+
+  protected transient JobConf jobConf;
+
+  private byte[] buffer;
+
+  protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
+  protected static final NumberFormat stageIdFormat = NumberFormat.getInstance();
+
+  static {
+    taskIdFormat.setGroupingUsed(false);
+    taskIdFormat.setMinimumIntegerDigits(6);
+    stageIdFormat.setGroupingUsed(false);
+    stageIdFormat.setMinimumIntegerDigits(4);
+  }
+
+  public HivePairFlatMapFunction(byte[] buffer) {
+    this.buffer = buffer;
+  }
+
+  protected void initJobConf() {
+    if (jobConf == null) {
+      jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+      setupMRLegacyConfigs();
+    }
+  }
+
+  protected abstract boolean isMap();
+
+  // Some Hive features depends on several MR configuration legacy, build and add
+  // these configuration to JobConf here.
+  private void setupMRLegacyConfigs() {
+    StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
+    taskAttemptIdBuilder.append(System.currentTimeMillis())
+      .append("_")
+      .append(stageIdFormat.format(TaskContext.get().getStageId()))
+      .append("_");
+
+    if (isMap()) {
+      taskAttemptIdBuilder.append("m_");
+    } else {
+      taskAttemptIdBuilder.append("r_");
+    }
+
+    // Spark task attempt id is increased by Spark context instead of task, which may introduce
+    // unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
+    taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().getPartitionId()))
+      .append("_0");
+
+    String taskAttemptIdStr = taskAttemptIdBuilder.toString();
+    jobConf.set("mapred.task.id", taskAttemptIdStr);
+    jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr);
+    jobConf.setInt("mapred.task.partition", TaskContext.get().getPartitionId());
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1630954&r1=1630953&r2=1630954&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Fri Oct 10 18:43:08 2014
@@ -30,25 +30,19 @@ import org.apache.spark.api.java.functio
 
 import scala.Tuple2;
 
-public class HiveReduceFunction implements PairFlatMapFunction<
-    Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable> {
-  private static final long serialVersionUID = 1L;
-
-  private transient JobConf jobConf;
+public class HiveReduceFunction extends HivePairFlatMapFunction<
+  Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>, HiveKey, BytesWritable> {
 
-  private byte[] buffer;
+  private static final long serialVersionUID = 1L;
 
   public HiveReduceFunction(byte[] buffer) {
-    this.buffer = buffer;
+    super(buffer);
   }
 
   @Override
   public Iterable<Tuple2<HiveKey, BytesWritable>>
   call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
-    if (jobConf == null) {
-      jobConf = KryoSerializer.deserializeJobConf(this.buffer);
-      SparkUtilities.setTaskInfoInJobConf(jobConf, TaskContext.get());
-    }
+    initJobConf();
 
     SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
     HiveReduceFunctionResultList result =
@@ -57,4 +51,9 @@ public class HiveReduceFunction implemen
 
     return result;
   }
+
+  @Override
+  protected boolean isMap() {
+    return false;
+  }
 }

Added: hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out?rev=1630954&view=auto
==============================================================================
Files hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out (added) and hive/branches/spark/ql/src/test/results/clientpositive/spark/sample10.q.out Fri Oct 10 18:43:08 2014 differ