You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/13 20:25:07 UTC

svn commit: r1617790 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/ parse/spark/

Author: szehon
Date: Wed Aug 13 18:25:06 2014
New Revision: 1617790

URL: http://svn.apache.org/r1617790
Log:
HIVE-7541 : Support union all on Spark (Na Yang via Szehon) [Spark Branch]

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java?rev=1617790&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java Wed Aug 13 18:25:06 2014
@@ -0,0 +1,126 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class GraphTran {
+
+  private Set<SparkTran> rootTrans = new HashSet<SparkTran>();
+  private Set<SparkTran> leafTrans = new HashSet<SparkTran>();
+  private Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
+  private Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
+  private Map<SparkTran, List<JavaPairRDD<BytesWritable, BytesWritable>>> unionInputs = new HashMap<SparkTran, List<JavaPairRDD<BytesWritable, BytesWritable>>>();
+  private Map<SparkTran, JavaPairRDD<BytesWritable, BytesWritable>> mapInputs = new HashMap<SparkTran, JavaPairRDD<BytesWritable, BytesWritable>>();
+
+  public void addTran(SparkTran tran) {
+    addTranWithInput(tran, null);
+  }
+
+  public void addTranWithInput(SparkTran tran,
+    JavaPairRDD<BytesWritable, BytesWritable> input) {
+    if (!rootTrans.contains(tran)) {
+      rootTrans.add(tran);
+      leafTrans.add(tran);
+      transGraph.put(tran, new LinkedList<SparkTran>());
+      invertedTransGraph.put(tran, new LinkedList<SparkTran>());
+    }
+    if (input != null) {
+      mapInputs.put(tran, input);
+    }
+  }
+
+  public void execute() throws Exception {
+    JavaPairRDD<BytesWritable, BytesWritable> resultRDD = null;
+    for (SparkTran tran : rootTrans) {
+      // make sure all the root trans are MapTran
+      if (!(tran instanceof MapTran)) {
+        throw new Exception("root transformations must be MapTran!");
+      }
+      JavaPairRDD<BytesWritable, BytesWritable> input = mapInputs.get(tran);
+      if (input == null) {
+        throw new Exception("input is missing for transformation!");
+      }
+      JavaPairRDD<BytesWritable, BytesWritable> rdd = tran.transform(input);
+
+      while (getChildren(tran).size() > 0) {
+        SparkTran childTran = getChildren(tran).get(0);
+        if (childTran instanceof UnionTran) {
+          List<JavaPairRDD<BytesWritable, BytesWritable>> unionInputList = unionInputs
+              .get(childTran);
+          if (unionInputList == null) {
+            // process the first union input RDD, cache it in the hash map
+            unionInputList = new LinkedList<JavaPairRDD<BytesWritable, BytesWritable>>();
+            unionInputList.add(rdd);
+            unionInputs.put(childTran, unionInputList);
+            break;
+          } else if (unionInputList.size() < this.getParents(childTran).size() - 1) {
+            // not the last input RDD yet, continue caching it in the hash map
+            unionInputList.add(rdd);
+            break;
+          } else if (unionInputList.size() == this.getParents(childTran).size() - 1) { // process
+            // process the last input RDD
+            for (JavaPairRDD<BytesWritable, BytesWritable> inputRDD : unionInputList) {
+              ((UnionTran) childTran).setOtherInput(inputRDD);
+              rdd = childTran.transform(rdd);
+            }
+          }
+        } else {
+          rdd = childTran.transform(rdd);
+        }
+        tran = childTran;
+      }
+      resultRDD = rdd;
+    }
+    if (resultRDD != null) {
+      resultRDD.foreach(HiveVoidFunction.getInstance());
+    }
+  }
+
+  public void connect(SparkTran a, SparkTran b) {
+    transGraph.get(a).add(b);
+    invertedTransGraph.get(b).add(a);
+    rootTrans.remove(b);
+    leafTrans.remove(a);
+  }
+
+  public List<SparkTran> getParents(SparkTran tran) throws Exception {
+    if (!invertedTransGraph.containsKey(tran)
+        || invertedTransGraph.get(tran) == null) {
+      throw new Exception("Cannot get parent transformations for " + tran);
+    }
+    return new LinkedList<SparkTran>(invertedTransGraph.get(tran));
+  }
+
+  public List<SparkTran> getChildren(SparkTran tran) throws Exception {
+    if (!transGraph.containsKey(tran) || transGraph.get(tran) == null) {
+      throw new Exception("Cannot get children transformations for " + tran);
+    }
+    return new LinkedList<SparkTran>(transGraph.get(tran));
+  }
+
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Wed Aug 13 18:25:06 2014
@@ -42,7 +42,8 @@ import java.util.*;
 
 public class SparkClient implements Serializable {
   private static final long serialVersionUID = 1L;
-  protected static transient final Log LOG = LogFactory.getLog(SparkClient.class);
+  protected static transient final Log LOG = LogFactory
+      .getLog(SparkClient.class);
 
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
   private static final String SPARK_DEFAULT_MASTER = "local";
@@ -73,12 +74,14 @@ public class SparkClient implements Seri
     // set default spark configurations.
     sparkConf.set("spark.master", SPARK_DEFAULT_MASTER);
     sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME);
-    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    sparkConf.set("spark.default.parallelism",  "1");
+    sparkConf.set("spark.serializer",
+        "org.apache.spark.serializer.KryoSerializer");
+    sparkConf.set("spark.default.parallelism", "1");
     // load properties from spark-defaults.conf.
     InputStream inputStream = null;
     try {
-      inputStream = this.getClass().getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
+      inputStream = this.getClass().getClassLoader()
+          .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
       if (inputStream != null) {
         LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
         Properties properties = new Properties();
@@ -87,13 +90,15 @@ public class SparkClient implements Seri
           if (propertyName.startsWith("spark")) {
             String value = properties.getProperty(propertyName);
             sparkConf.set(propertyName, properties.getProperty(propertyName));
-            LOG.info(String.format("load spark configuration from %s (%s -> %s).",
+            LOG.info(String.format(
+                "load spark configuration from %s (%s -> %s).",
                 SPARK_DEFAULT_CONF_FILE, propertyName, value));
           }
         }
       }
     } catch (IOException e) {
-      LOG.info("Failed to open spark configuration file:" + SPARK_DEFAULT_CONF_FILE, e);
+      LOG.info("Failed to open spark configuration file:"
+          + SPARK_DEFAULT_CONF_FILE, e);
     } finally {
       if (inputStream != null) {
         try {
@@ -112,7 +117,8 @@ public class SparkClient implements Seri
       if (propertyName.startsWith("spark")) {
         String value = entry.getValue();
         sparkConf.set(propertyName, value);
-        LOG.info(String.format("load spark configuration from hive configuration (%s -> %s).",
+        LOG.info(String.format(
+            "load spark configuration from hive configuration (%s -> %s).",
             propertyName, value));
       }
     }
@@ -122,7 +128,7 @@ public class SparkClient implements Seri
 
   public int execute(DriverContext driverContext, SparkWork sparkWork) {
     Context ctx = driverContext.getCtx();
-    HiveConf hiveConf = (HiveConf)ctx.getConf();
+    HiveConf hiveConf = (HiveConf) ctx.getConf();
     refreshLocalResources(sparkWork, hiveConf);
     JobConf jobConf = new JobConf(hiveConf);
 
@@ -138,7 +144,8 @@ public class SparkClient implements Seri
     }
 
     // Generate Spark plan
-    SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir);
+    SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf,
+        emptyScratchDir);
     SparkPlan plan;
     try {
       plan = gen.generate(sparkWork);
@@ -148,8 +155,12 @@ public class SparkClient implements Seri
     }
 
     // Execute generated plan.
-    // TODO: we should catch any exception and return more meaningful error code.
-    plan.execute();
+    try {
+      plan.execute();
+    } catch (Exception e) {
+      LOG.error("Error executing Spark Plan", e);
+      return 1;
+    }
     return 0;
   }
 
@@ -167,7 +178,8 @@ public class SparkClient implements Seri
     }
 
     // add added jars
-    String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+    String addedJars = Utilities.getResourceFiles(conf,
+        SessionState.ResourceType.JAR);
     if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) {
       HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
       addJars(addedJars);
@@ -194,16 +206,20 @@ public class SparkClient implements Seri
       }
     }
 
-    //add added files
-    String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
-    if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) {
+    // add added files
+    String addedFiles = Utilities.getResourceFiles(conf,
+        SessionState.ResourceType.FILE);
+    if (StringUtils.isNotEmpty(addedFiles)
+        && StringUtils.isNotBlank(addedFiles)) {
       HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
       addResources(addedFiles);
     }
 
     // add added archives
-    String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
-    if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) {
+    String addedArchives = Utilities.getResourceFiles(conf,
+        SessionState.ResourceType.ARCHIVE);
+    if (StringUtils.isNotEmpty(addedArchives)
+        && StringUtils.isNotBlank(addedArchives)) {
       HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
       addResources(addedArchives);
     }
@@ -211,7 +227,8 @@ public class SparkClient implements Seri
 
   private void addResources(String addedFiles) {
     for (String addedFile : addedFiles.split(",")) {
-      if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile)
+      if (StringUtils.isNotEmpty(addedFile)
+          && StringUtils.isNotBlank(addedFile)
           && !localFiles.contains(addedFile)) {
         localFiles.add(addedFile);
         sc.addFile(addedFile);
@@ -222,7 +239,7 @@ public class SparkClient implements Seri
   private void addJars(String addedJars) {
     for (String addedJar : addedJars.split(",")) {
       if (StringUtils.isNotEmpty(addedJar) && StringUtils.isNotBlank(addedJar)
-        && !localJars.contains(addedJar)) {
+          && !localJars.contains(addedJar)) {
         localJars.add(addedJar);
         sc.addJar(addedJar);
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Wed Aug 13 18:25:06 2014
@@ -18,31 +18,19 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.spark.api.java.JavaPairRDD;
-
 public class SparkPlan {
-  private JavaPairRDD<BytesWritable, BytesWritable> input;
-  private SparkTran tran;
 
-  public void execute() {
-    JavaPairRDD<BytesWritable, BytesWritable> rdd = tran.transform(input);
-    rdd.foreach(HiveVoidFunction.getInstance());
-  }
+  private GraphTran tran;
 
-  public SparkTran getTran() {
-    return tran;
+  public void execute() throws Exception {
+    tran.execute();
   }
 
-  public void setTran(SparkTran tran) {
+  public void setTran(GraphTran tran) {
     this.tran = tran;
   }
 
-  public JavaPairRDD<BytesWritable, BytesWritable> getInput() {
-    return input;
-  }
-
-  public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) {
-    this.input = input;
+  public GraphTran getTran() {
+    return tran;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Wed Aug 13 18:25:06 2014
@@ -19,8 +19,9 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -57,8 +59,10 @@ public class SparkPlanGenerator {
   private final JobConf jobConf;
   private Context context;
   private Path scratchDir;
+  private Map<BaseWork, SparkTran> unionWorkTrans = new HashMap<BaseWork, SparkTran>();
 
-  public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) {
+  public SparkPlanGenerator(JavaSparkContext sc, Context context,
+      JobConf jobConf, Path scratchDir) {
     this.sc = sc;
     this.context = context;
     this.jobConf = jobConf;
@@ -67,45 +71,72 @@ public class SparkPlanGenerator {
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
     SparkPlan plan = new SparkPlan();
-    List<SparkTran> trans = new ArrayList<SparkTran>();
+    GraphTran trans = new GraphTran();
     Set<BaseWork> roots = sparkWork.getRoots();
-    assert(roots != null && roots.size() == 1);
-    BaseWork w = roots.iterator().next();
-    MapWork mapWork = (MapWork) w;
-    trans.add(generate(w));
-    while (sparkWork.getChildren(w).size() > 0) {
-      ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0);
-      SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
-      SparkShuffler st = generate(edge);
-      ReduceTran rt = generate(child);
-      rt.setShuffler(st);
-      rt.setNumPartitions(edge.getNumPartitions());
-      trans.add(rt);
-      w = child;
-    }
-    ChainedTran chainedTran = new ChainedTran(trans);
-    plan.setTran(chainedTran);
-    JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
-    plan.setInput(input);
+    for (BaseWork w : roots) {
+      if (!(w instanceof MapWork)) {
+        throw new Exception(
+            "The roots in the SparkWork must be MapWork instances!");
+      }
+      MapWork mapWork = (MapWork) w;
+      SparkTran tran = generate(w);
+      JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
+      trans.addTranWithInput(tran, input);
+
+      while (sparkWork.getChildren(w).size() > 0) {
+        BaseWork child = sparkWork.getChildren(w).get(0);
+        if (child instanceof ReduceWork) {
+          SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
+          SparkShuffler st = generate(edge);
+          ReduceTran rt = generate((ReduceWork) child);
+          rt.setShuffler(st);
+          rt.setNumPartitions(edge.getNumPartitions());
+          trans.addTran(rt);
+          trans.connect(tran, rt);
+          w = child;
+          tran = rt;
+        } else if (child instanceof UnionWork) {
+          if (unionWorkTrans.get(child) != null) {
+            trans.connect(tran, unionWorkTrans.get(child));
+            break;
+          } else {
+            SparkTran ut = generate((UnionWork) child);
+            unionWorkTrans.put(child, ut);
+            trans.addTran(ut);
+            trans.connect(tran, ut);
+            w = child;
+            tran = ut;
+          }
+        }
+      }
+    }
+    unionWorkTrans.clear();
+    plan.setTran(trans);
     return plan;
   }
 
-  private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork) throws Exception {
-    List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false);
-    Utilities.setInputPaths(jobConf, inputPaths);
-    Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
+  private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork)
+      throws Exception {
+    JobConf newJobConf = new JobConf(jobConf);
+    List<Path> inputPaths = Utilities.getInputPaths(newJobConf, mapWork,
+        scratchDir, context, false);
+    Utilities.setInputPaths(newJobConf, inputPaths);
+    Utilities.setMapWork(newJobConf, mapWork, scratchDir, true);
     Class ifClass = getInputFormat(mapWork);
 
     // The mapper class is expected by the HiveInputFormat.
-    jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
-    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
+    newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+    return sc.hadoopRDD(newJobConf, ifClass, WritableComparable.class,
+        Writable.class);
   }
 
   private Class getInputFormat(MapWork mWork) throws HiveException {
     if (mWork.getInputformat() != null) {
-      HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+      HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT,
+          mWork.getInputformat());
     }
-    String inpFormat = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+    String inpFormat = HiveConf.getVar(jobConf,
+        HiveConf.ConfVars.HIVEINPUTFORMAT);
     if ((inpFormat == null) || (StringUtils.isBlank(inpFormat))) {
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
     }
@@ -118,7 +149,8 @@ public class SparkPlanGenerator {
     try {
       inputFormatClass = Class.forName(inpFormat);
     } catch (ClassNotFoundException e) {
-      String message = "Failed to load specified input format class:" + inpFormat;
+      String message = "Failed to load specified input format class:"
+          + inpFormat;
       LOG.error(message, e);
       throw new HiveException(message, e);
     }
@@ -135,33 +167,36 @@ public class SparkPlanGenerator {
         statsPublisher = factory.getStatsPublisher();
         if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
           if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
-            throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+            throw new HiveException(
+                ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
           }
         }
       }
     }
     if (bw instanceof MapWork) {
-      return generate((MapWork)bw);
+      return generate((MapWork) bw);
     } else if (bw instanceof ReduceWork) {
-      return generate((ReduceWork)bw);
+      return generate((ReduceWork) bw);
     } else {
-      throw new IllegalArgumentException("Only MapWork and ReduceWork are expected");
+      throw new IllegalArgumentException(
+          "Only MapWork and ReduceWork are expected");
     }
   }
 
   private MapTran generate(MapWork mw) throws IOException {
+    JobConf newJobConf = new JobConf(jobConf);
     MapTran result = new MapTran();
-    Utilities.setMapWork(jobConf, mw, scratchDir, false);
-    Utilities.createTmpDirs(jobConf, mw);
-    jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
-    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+    Utilities.setMapWork(newJobConf, mw, scratchDir, true);
+    Utilities.createTmpDirs(newJobConf, mw);
+    newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+    byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
     HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
     result.setMapFunction(mapFunc);
     return result;
   }
 
   private SparkShuffler generate(SparkEdgeProperty edge) {
-    if (edge.isShuffleSort()){
+    if (edge.isShuffleSort()) {
       return new SortByShuffler();
     }
     return new GroupByShuffler();
@@ -181,4 +216,9 @@ public class SparkPlanGenerator {
     return result;
   }
 
+  private UnionTran generate(UnionWork uw) {
+    UnionTran result = new UnionTran();
+    return result;
+  }
+
 }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java?rev=1617790&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java Wed Aug 13 18:25:06 2014
@@ -0,0 +1,40 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class UnionTran implements SparkTran {
+  JavaPairRDD<BytesWritable, BytesWritable> otherInput;
+
+  @Override
+  public JavaPairRDD<BytesWritable, BytesWritable> transform(
+      JavaPairRDD<BytesWritable, BytesWritable> input) {
+    return input.union(otherInput);
+  }
+
+  public void setOtherInput(JavaPairRDD<BytesWritable, BytesWritable> otherInput) {
+    this.otherInput = otherInput;
+  }
+
+  public JavaPairRDD<BytesWritable, BytesWritable> getOtherInput() {
+    return this.otherInput;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Wed Aug 13 18:25:06 2014
@@ -230,9 +230,6 @@ public class GenSparkUtils {
         }
         linked = context.linkedFileSinks.get(path);
         linked.add(desc);
-
-        desc.setDirName(new Path(path, ""+linked.size()));
-        desc.setLinkedFileSinkDesc(linked);
       }
 
       if (current instanceof UnionOperator) {