You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/08 03:00:12 UTC

svn commit: r1650201 [2/3] - in /hive/branches/spark: itests/hive-unit/src/test/java/org/apache/hive/jdbc/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/se...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java Thu Jan  8 02:00:11 2015
@@ -58,6 +58,7 @@ public class SparkCrossProductCheck impl
   @Override
   public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
       throws SemanticException {
+    @SuppressWarnings("unchecked")
     Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
     if (currTask instanceof SparkTask) {
       SparkWork sparkWork = ((SparkTask) currTask).getWork();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Thu Jan  8 02:00:11 2015
@@ -240,8 +240,6 @@ public class SparkMapJoinResolver implem
             }
           }
         }
-
-        // TODO: enable non-staged mapjoin
       }
     }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Thu Jan  8 02:00:11 2015
@@ -96,8 +96,8 @@ public class SetSparkReducerParallelism
         long numberOfBytes = 0;
 
         // we need to add up all the estimates from the siblings of this reduce sink
-        for (Operator<? extends OperatorDesc> sibling:
-          sink.getChildOperators().get(0).getParentOperators()) {
+        for (Operator<? extends OperatorDesc> sibling
+          : sink.getChildOperators().get(0).getParentOperators()) {
           if (sibling.getStatistics() != null) {
             numberOfBytes += sibling.getStatistics().getDataSize();
             if (LOG.isDebugEnabled()) {
@@ -139,8 +139,8 @@ public class SetSparkReducerParallelism
         if (numReducers < cores) {
           numReducers = cores;
         }
-        LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers +
-          ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes);
+        LOG.info("Set parallelism parameters: cores = " + cores + ", numReducers = " + numReducers
+          + ", bytesPerReducer = " + bytesPerReducer + ", numberOfBytes = " + numberOfBytes);
         LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers + " (calculated)");
         desc.setNumReducers(numReducers);
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java Thu Jan  8 02:00:11 2015
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.util.Stack;
+
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -30,15 +31,13 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 
-import java.util.Stack;
-
 /**
  * This processes joins in which user specified a hint to identify the small-table.
- * Currently it takes a mapjoin already converted from hints, and converts it further to BucketMapJoin or SMBMapJoin
- * using same small-table identification.
+ * Currently it takes a mapjoin already converted from hints, and converts it further
+ * to BucketMapJoin or SMBMapJoin using same small-table identification.
  *
- * The idea is eventually to process even hinted Mapjoin hints here, but due to code complexity in refactoring, that is still
- * in Optimizer.
+ * The idea is eventually to process even hinted Mapjoin hints here,
+ * but due to code complexity in refactoring, that is still in Optimizer.
  */
 public class SparkJoinHintOptimizer implements NodeProcessor {
 
@@ -51,13 +50,14 @@ public class SparkJoinHintOptimizer impl
   }
 
   @Override
-  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
-    MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+    Object... nodeOutputs) throws SemanticException {
     OptimizeSparkProcContext context = (OptimizeSparkProcContext) procCtx;
     HiveConf hiveConf = context.getParseContext().getConf();
 
     // Convert from mapjoin to bucket map join if enabled.
-    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN) || hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)
+      || hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
       BucketJoinProcCtx bjProcCtx = new BucketJoinProcCtx(hiveConf);
       bucketMapJoinOptimizer.process(nd, stack, bjProcCtx, nodeOutputs);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java Thu Jan  8 02:00:11 2015
@@ -29,7 +29,8 @@ import org.apache.hadoop.hive.ql.parse.s
 import java.util.Stack;
 
 /**
- * Converts a join to a more optimized join for the Spark path.  Delegates to a more specialized join processor.
+ * Converts a join to a more optimized join for the Spark path.
+ * Delegates to a more specialized join processor.
  */
 public class SparkJoinOptimizer implements NodeProcessor {
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java Thu Jan  8 02:00:11 2015
@@ -60,8 +60,8 @@ public class SparkMapJoinOptimizer imple
   private static final Log LOG = LogFactory.getLog(SparkMapJoinOptimizer.class.getName());
 
   @Override
-  /*
-   * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+  /**
+   * We should ideally not modify the tree we traverse. However,
    * since we need to walk the tree at any time when we modify the operator, we
    * might as well do it here.
    */
@@ -74,66 +74,14 @@ public class SparkMapJoinOptimizer imple
     JoinOperator joinOp = (JoinOperator) nd;
 
     if (!conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
-      // && !(conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
-      // we are just converting to a common merge join operator. The shuffle
-      // join in map-reduce case.
-      // int pos = 0; // it doesn't matter which position we use in this case.
-      // convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
     LOG.info("Check if it can be converted to map join");
     long[] mapJoinInfo = getMapJoinConversionInfo(joinOp, context);
-    int mapJoinConversionPos = (int)mapJoinInfo[0];
+    int mapJoinConversionPos = (int) mapJoinInfo[0];
 
     if (mapJoinConversionPos < 0) {
-      /* TODO: handle this later
-      // we cannot convert to bucket map join, we cannot convert to
-      // map join either based on the size. Check if we can convert to SMB join.
-      if (conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
-        convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
-        return null;
-      }
-      Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
-      try {
-        bigTableMatcherClass =
-            (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
-                parseContext.getConf(),
-                HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
-      } catch (ClassNotFoundException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      BigTableSelectorForAutoSMJ bigTableMatcher =
-          ReflectionUtils.newInstance(bigTableMatcherClass, null);
-      JoinDesc joinDesc = joinOp.getConf();
-      JoinCondDesc[] joinCondns = joinDesc.getConds();
-      Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
-      if (joinCandidates.isEmpty()) {
-        // This is a full outer join. This can never be a map-join
-        // of any type. So return false.
-        return false;
-      }
-      mapJoinConversionPos =
-          bigTableMatcher.getBigTablePosition(parseContext, joinOp, joinCandidates);
-      if (mapJoinConversionPos < 0) {
-        // contains aliases from sub-query
-        // we are just converting to a common merge join operator. The shuffle
-        // join in map-reduce case.
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-        return null;
-      }
-
-      if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
-        convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
-            tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
-      } else {
-        // we are just converting to a common merge join operator. The shuffle
-        // join in map-reduce case.
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-      }  */
       return null;
     }
 
@@ -166,107 +114,9 @@ public class SparkMapJoinOptimizer imple
     return mapJoinOp;
   }
 
-  // replaces the join operator with a new CommonJoinOperator, removes the
-  // parent reduce sinks
-  /*
-  private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeSparkProcContext context,
-      int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
-      throws SemanticException {
-    ParseContext parseContext = context.parseContext;
-    MapJoinDesc mapJoinDesc = null;
-    if (adjustParentsChildren) {
-        mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
-            joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
-    } else {
-      JoinDesc joinDesc = joinOp.getConf();
-      // retain the original join desc in the map join.
-      mapJoinDesc =
-          new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
-              joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
-              joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
-    }
-
-    @SuppressWarnings("unchecked")
-    CommonMergeJoinOperator mergeJoinOp =
-        (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
-            isSubQuery, mapJoinConversionPos, mapJoinDesc));
-    OpTraits opTraits =
-        new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
-            .getSortCols());
-    mergeJoinOp.setOpTraits(opTraits);
-    mergeJoinOp.setStatistics(joinOp.getStatistics());
-
-    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
-      int pos = parentOp.getChildOperators().indexOf(joinOp);
-      parentOp.getChildOperators().remove(pos);
-      parentOp.getChildOperators().add(pos, mergeJoinOp);
-    }
-
-    for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
-      int pos = childOp.getParentOperators().indexOf(joinOp);
-      childOp.getParentOperators().remove(pos);
-      childOp.getParentOperators().add(pos, mergeJoinOp);
-    }
-
-    List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
-    if (childOperators == null) {
-      childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-      mergeJoinOp.setChildOperators(childOperators);
-    }
-
-    List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
-    if (parentOperators == null) {
-      parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-      mergeJoinOp.setParentOperators(parentOperators);
-    }
-
-    childOperators.clear();
-    parentOperators.clear();
-    childOperators.addAll(joinOp.getChildOperators());
-    parentOperators.addAll(joinOp.getParentOperators());
-    mergeJoinOp.getConf().setGenJoinKeys(false);
-
-    if (adjustParentsChildren) {
-      mergeJoinOp.getConf().setGenJoinKeys(true);
-      List<Operator<? extends OperatorDesc>> newParentOpList =
-          new ArrayList<Operator<? extends OperatorDesc>>();
-      for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
-        for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
-          grandParentOp.getChildOperators().remove(parentOp);
-          grandParentOp.getChildOperators().add(mergeJoinOp);
-          newParentOpList.add(grandParentOp);
-        }
-      }
-      mergeJoinOp.getParentOperators().clear();
-      mergeJoinOp.getParentOperators().addAll(newParentOpList);
-      List<Operator<? extends OperatorDesc>> parentOps =
-          new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
-      for (Operator<? extends OperatorDesc> parentOp : parentOps) {
-        int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
-        if (parentIndex == mapJoinConversionPos) {
-          continue;
-        }
-
-        // insert the dummy store operator here
-        DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
-        dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
-        dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
-        dummyStoreOp.getChildOperators().add(mergeJoinOp);
-        int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
-        parentOp.getChildOperators().remove(index);
-        parentOp.getChildOperators().add(index, dummyStoreOp);
-        dummyStoreOp.getParentOperators().add(parentOp);
-        mergeJoinOp.getParentOperators().remove(parentIndex);
-        mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
-      }
-    }
-    mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
-  }
-  */
-
   private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
     int numBuckets = currentOp.getOpTraits().getNumBuckets();
-    for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
+    for (Operator<? extends OperatorDesc> op : currentOp.getChildOperators()) {
       if (!(op instanceof ReduceSinkOperator) && !(op instanceof GroupByOperator)) {
         op.getOpTraits().setNumBuckets(numBuckets);
         if (numBuckets < 0) {
@@ -298,8 +148,8 @@ public class SparkMapJoinOptimizer imple
     BucketMapjoinProc.checkAndConvertBucketMapJoin(
       parseContext, mapJoinOp, joinTree, baseBigAlias, joinAliases);
     MapJoinDesc joinDesc = mapJoinOp.getConf();
-    return joinDesc.isBucketMapJoin() ?
-      joinDesc.getBigTableBucketNumMapping().size() : -1;
+    return joinDesc.isBucketMapJoin()
+      ? joinDesc.getBigTableBucketNumMapping().size() : -1;
   }
 
   /**
@@ -337,7 +187,7 @@ public class SparkMapJoinOptimizer imple
 
       Statistics currInputStat = parentOp.getStatistics();
       if (currInputStat == null) {
-        LOG.warn("Couldn't get statistics from: "+parentOp);
+        LOG.warn("Couldn't get statistics from: " + parentOp);
         return new long[]{-1, 0, 0};
       }
 
@@ -359,15 +209,14 @@ public class SparkMapJoinOptimizer imple
       // Otherwise, we could try to break the op tree at the UNION, and create two MapWorks
       // for the branches above. Then, MJ will be in the following ReduceWork.
       // But, this is tricky to implement, and we'll leave it as a future work for now.
-      // TODO: handle this as a MJ case
       if (containUnionWithoutRS(parentOp.getParentOperators().get(0))) {
         return new long[]{-1, 0, 0};
       }
 
       long inputSize = currInputStat.getDataSize();
-      if ((bigInputStat == null) ||
-          ((bigInputStat != null) &&
-          (inputSize > bigInputStat.getDataSize()))) {
+      if ((bigInputStat == null)
+          || ((bigInputStat != null)
+          && (inputSize > bigInputStat.getDataSize()))) {
 
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
@@ -416,9 +265,10 @@ public class SparkMapJoinOptimizer imple
       return new long[]{-1, 0, 0};
     }
 
-    //Final check, find size of already-calculated Mapjoin Operators in same work (spark-stage).  We need to factor
-    //this in to prevent overwhelming Spark executor-memory.
-    long connectedMapJoinSize = getConnectedMapJoinSize(joinOp.getParentOperators().get(bigTablePosition), joinOp, context);
+    //Final check, find size of already-calculated Mapjoin Operators in same work (spark-stage).
+    //We need to factor this in to prevent overwhelming Spark executor-memory.
+    long connectedMapJoinSize = getConnectedMapJoinSize(joinOp.getParentOperators().
+      get(bigTablePosition), joinOp, context);
     if ((connectedMapJoinSize + totalSize) > maxSize) {
       return new long[]{-1, 0, 0};
     }
@@ -434,7 +284,8 @@ public class SparkMapJoinOptimizer imple
    * @return total size of parent mapjoins in same work as this operator.
    */
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private long getConnectedMapJoinSize(Operator<? extends OperatorDesc> parentOp, Operator joinOp, OptimizeSparkProcContext ctx) {
+  private long getConnectedMapJoinSize(Operator<? extends OperatorDesc> parentOp, Operator joinOp,
+    OptimizeSparkProcContext ctx) {
     long result = 0;
     for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
       result += getConnectedParentMapJoinSize(grandParentOp, ctx);
@@ -482,7 +333,8 @@ public class SparkMapJoinOptimizer imple
     }
 
     if (op instanceof MapJoinOperator) {
-      //found child mapjoin operator.  Its size should already reflect any mapjoins connected to it, so stop processing.
+      //Found child mapjoin operator.
+      //Its size should already reflect any mapjoins connected to it, so stop processing.
       long mjSize = ctx.getMjOpSizes().get(op);
       return mjSize;
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java Thu Jan  8 02:00:11 2015
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -61,6 +62,8 @@ import java.util.Stack;
 
 public class SparkReduceSinkMapJoinProc implements NodeProcessor {
 
+  public static final Log LOG = LogFactory.getLog(SparkReduceSinkMapJoinProc.class.getName());
+
   public static class SparkMapJoinFollowedByGroupByProcessor implements NodeProcessor {
     private boolean hasGroupBy = false;
 
@@ -81,8 +84,6 @@ public class SparkReduceSinkMapJoinProc
     }
   }
 
-  protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
-
   private boolean hasGroupBy(Operator<? extends OperatorDesc> mapjoinOp,
                              GenSparkProcContext context) throws SemanticException {
     List<Operator<? extends OperatorDesc>> childOps = mapjoinOp.getChildOperators();
@@ -106,6 +107,7 @@ public class SparkReduceSinkMapJoinProc
    * on the basis of the big table side because it may be a mapwork (no need for shuffle)
    * or reduce work.
    */
+  @SuppressWarnings("unchecked")
   @Override
   public Object process(Node nd, Stack<Node> stack,
                         NodeProcessorCtx procContext, Object... nodeOutputs)
@@ -132,14 +134,14 @@ public class SparkReduceSinkMapJoinProc
     parentRS.setSkipTag(true);
     // remember the original parent list before we start modifying it.
     if (!context.mapJoinParentMap.containsKey(mapJoinOp)) {
-      List<Operator<?>> parents = new ArrayList(mapJoinOp.getParentOperators());
+      List<Operator<?>> parents = new ArrayList<Operator<?>>(mapJoinOp.getParentOperators());
       context.mapJoinParentMap.put(mapJoinOp, parents);
     }
 
     List<BaseWork> mapJoinWork;
 
     /*
-     *  if there was a pre-existing work generated for the big-table mapjoin side,
+     *  If there was a pre-existing work generated for the big-table mapjoin side,
      *  we need to hook the work generated for the RS (associated with the RS-MJ pattern)
      *  with the pre-existing work.
      *
@@ -161,20 +163,6 @@ public class SparkReduceSinkMapJoinProc
     LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName());
     mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
 
-/*  int numBuckets = -1;
-    EdgeType edgeType = EdgeType.BROADCAST_EDGE;
-    if (mapJoinOp.getConf().isBucketMapJoin()) {
-
-      // disable auto parallelism for bucket map joins
-      parentRS.getConf().setAutoParallel(false);
-
-      numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0];
-      if (mapJoinOp.getConf().getCustomBucketMapJoin()) {
-        edgeType = EdgeType.CUSTOM_EDGE;
-      } else {
-        edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
-      }
-    }*/
     SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE);
 
     if (mapJoinWork != null) {
@@ -209,7 +197,6 @@ public class SparkReduceSinkMapJoinProc
 
     // create an new operator: HashTableDummyOperator, which share the table desc
     HashTableDummyDesc desc = new HashTableDummyDesc();
-    @SuppressWarnings("unchecked")
     HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc);
     TableDesc tbl;
 
@@ -221,7 +208,7 @@ public class SparkReduceSinkMapJoinProc
     Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys();
     List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
     StringBuffer keyOrder = new StringBuffer();
-    for (ExprNodeDesc k: keyCols) {
+    for (int i = 0; i < keyCols.size(); i++) {
       keyOrder.append("+");
     }
     TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils
@@ -291,11 +278,11 @@ public class SparkReduceSinkMapJoinProc
     }
 
     //get all parents of reduce sink
-    List<Operator<? extends OperatorDesc>> RSparentOps = parentRS.getParentOperators();
-    for (Operator<? extends OperatorDesc> parent : RSparentOps) {
+    List<Operator<? extends OperatorDesc>> rsParentOps = parentRS.getParentOperators();
+    for (Operator<? extends OperatorDesc> parent : rsParentOps) {
       parent.replaceChild(parentRS, hashTableSinkOp);
     }
-    hashTableSinkOp.setParentOperators(RSparentOps);
+    hashTableSinkOp.setParentOperators(rsParentOps);
     hashTableSinkOp.setTag(tag);
     return true;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java Thu Jan  8 02:00:11 2015
@@ -1,12 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
-import com.clearspring.analytics.util.Preconditions;
+import java.util.List;
+import java.util.Stack;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -18,8 +37,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 
-import java.util.List;
-import java.util.Stack;
+import com.clearspring.analytics.util.Preconditions;
 
 /**
  * Converts from a bucket-mapjoin created from hints to SMB mapjoin.
@@ -44,8 +62,8 @@ public class SparkSMBJoinHintOptimizer e
 
     // Throw an error if the user asked for sort merge bucketed mapjoin to be enforced
     // and sort merge bucketed mapjoin cannot be performed
-    if (!convert &&
-      pGraphContext.getConf().getBoolVar(
+    if (!convert
+      && pGraphContext.getConf().getBoolVar(
         HiveConf.ConfVars.HIVEENFORCESORTMERGEBUCKETMAPJOIN)) {
       throw new SemanticException(ErrorMsg.SORTMERGE_MAPJOIN_FAILED.getMsg());
     }
@@ -62,6 +80,7 @@ public class SparkSMBJoinHintOptimizer e
    * In SMB join these are not expected for any parents, either from small or big tables.
    * @param mapJoinOp
    */
+  @SuppressWarnings("unchecked")
   private void removeSmallTableReduceSink(MapJoinOperator mapJoinOp) {
     SMBJoinDesc smbJoinDesc = new SMBJoinDesc(mapJoinOp.getConf());
     List<Operator<? extends OperatorDesc>> parentOperators = mapJoinOp.getParentOperators();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java Thu Jan  8 02:00:11 2015
@@ -56,7 +56,7 @@ import java.util.Set;
 import java.util.Stack;
 
 /**
- * Spark-version of SkewJoinProcFactory
+ * Spark-version of SkewJoinProcFactory.
  */
 public class SparkSkewJoinProcFactory {
   private SparkSkewJoinProcFactory() {
@@ -82,10 +82,10 @@ public class SparkSkewJoinProcFactory {
       JoinOperator op = (JoinOperator) nd;
       ReduceWork reduceWork = context.getReducerToReduceWork().get(op);
       ParseContext parseContext = context.getParseCtx();
-      if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask &&
-          reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) &&
-          GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
-              op, currentTsk, parseContext.getConf())) {
+      if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask
+        && reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork)
+        && GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
+          op, currentTsk, parseContext.getConf())) {
         // first we try to split the task
         splitTask((SparkTask) currentTsk, reduceWork, parseContext);
         GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext);
@@ -102,8 +102,8 @@ public class SparkSkewJoinProcFactory {
     SparkWork currentWork = currentTask.getWork();
     Set<Operator<? extends OperatorDesc>> reduceSinkSet =
         SparkMapJoinResolver.getOp(reduceWork, ReduceSinkOperator.class);
-    if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork) &&
-        reduceSinkSet.size() == 1) {
+    if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork)
+      && reduceSinkSet.size() == 1) {
       ReduceSinkOperator reduceSink = (ReduceSinkOperator) reduceSinkSet.iterator().next();
       BaseWork childWork = currentWork.getChildren(reduceWork).get(0);
       SparkEdgeProperty originEdge = currentWork.getEdgeProperty(reduceWork, childWork);
@@ -118,7 +118,6 @@ public class SparkSkewJoinProcFactory {
       // remove them from current spark work
       for (BaseWork baseWork : newWork.getAllWorkUnsorted()) {
         currentWork.remove(baseWork);
-        // TODO: take care of cloneToWork?
         currentWork.getCloneToWork().remove(baseWork);
       }
       // create TS to read intermediate data
@@ -152,7 +151,6 @@ public class SparkSkewJoinProcFactory {
         } else {
           streamDesc = "$INTNAME";
         }
-        // TODO: remove this?
         String origStreamDesc = streamDesc;
         int pos = 0;
         while (mapWork.getAliasToWork().get(streamDesc) != null) {
@@ -162,6 +160,7 @@ public class SparkSkewJoinProcFactory {
       GenMapRedUtils.setTaskPlan(taskTmpDir.toUri().toString(), streamDesc,
           tableScanOp, mapWork, false, tableDesc);
       // insert the new task between current task and its child
+      @SuppressWarnings("unchecked")
       Task<? extends Serializable> newTask = TaskFactory.get(newWork, parseContext.getConf());
       List<Task<? extends Serializable>> childTasks = currentTask.getChildTasks();
       // must have at most one child
@@ -190,7 +189,7 @@ public class SparkSkewJoinProcFactory {
   }
 
   /**
-   * Copy a sub-graph from originWork to newWork
+   * Copy a sub-graph from originWork to newWork.
    */
   private static void copyWorkGraph(SparkWork originWork, SparkWork newWork,
       BaseWork baseWork, boolean upWards) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java Thu Jan  8 02:00:11 2015
@@ -18,8 +18,16 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
@@ -39,17 +47,8 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
 /**
- * Spark version of SkewJoinResolver
+ * Spark version of SkewJoinResolver.
  */
 public class SparkSkewJoinResolver implements PhysicalPlanResolver {
   @Override
@@ -62,7 +61,7 @@ public class SparkSkewJoinResolver imple
     return pctx;
   }
 
-  class SparkSkewJoinTaskDispatcher implements Dispatcher{
+  class SparkSkewJoinTaskDispatcher implements Dispatcher {
     private PhysicalContext physicalContext;
 
     public SparkSkewJoinTaskDispatcher(PhysicalContext context) {
@@ -74,6 +73,7 @@ public class SparkSkewJoinResolver imple
     public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
         throws SemanticException {
 
+      @SuppressWarnings("unchecked")
       Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
       if (task instanceof SparkTask) {
         SparkWork sparkWork = ((SparkTask) task).getWork();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java Thu Jan  8 02:00:11 2015
@@ -17,10 +17,13 @@
 */
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -32,10 +35,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
 /**
 * Operator factory for Spark SMBJoin processing.
 */
@@ -53,6 +52,7 @@ public final class SparkSortMergeJoinFac
       Stack<Node> stack) {
     int size = stack.size();
     assert size >= 2 && stack.get(size - 1) == op;
+    @SuppressWarnings("unchecked")
     Operator<? extends OperatorDesc> parent =
         (Operator<? extends OperatorDesc>) stack.get(size - 2);
     List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
@@ -154,8 +154,6 @@ public final class SparkSortMergeJoinFac
       SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd;
       GenSparkProcContext ctx = (GenSparkProcContext) procCtx;
 
-      SparkTask currTask = ctx.currentTask;
-
       // find the branch on which this processor was invoked
       int pos = getPositionParent(mapJoin, stack);
       boolean local = pos != mapJoin.getConf().getPosBigTable();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java Thu Jan  8 02:00:11 2015
@@ -70,26 +70,29 @@ public class SparkSortMergeJoinOptimizer
     return null;
   }
 
-  protected boolean canConvertJoinToSMBJoin(JoinOperator joinOperator, SortBucketJoinProcCtx smbJoinContext,
-    ParseContext pGraphContext, Stack<Node> stack) throws SemanticException {
+  protected boolean canConvertJoinToSMBJoin(JoinOperator joinOperator,
+    SortBucketJoinProcCtx smbJoinContext, ParseContext pGraphContext,
+    Stack<Node> stack) throws SemanticException {
     if (!supportBucketMapJoin(stack)) {
       return false;
     }
     return canConvertJoinToSMBJoin(joinOperator, smbJoinContext, pGraphContext);
   }
 
-  //Preliminary checks.  In the MR version of the code, these used to be done via another walk, refactoring to be inline.
+  //Preliminary checks.  In the MR version of the code, these used to be done via another walk,
+  //here it is done inline.
   private boolean supportBucketMapJoin(Stack<Node> stack) {
     int size = stack.size();
-    if (!(stack.get(size-1) instanceof JoinOperator) ||
-            !(stack.get(size-2) instanceof ReduceSinkOperator)) {
+    if (!(stack.get(size - 1) instanceof JoinOperator)
+      || !(stack.get(size - 2) instanceof ReduceSinkOperator)) {
       return false;
     }
 
     // If any operator in the stack does not support a auto-conversion, this join should
     // not be converted.
-    for (int pos = size -3; pos >= 0; pos--) {
-      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>)stack.get(pos);
+    for (int pos = size - 3; pos >= 0; pos--) {
+      @SuppressWarnings("unchecked")
+      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) stack.get(pos);
       if (!op.supportAutomaticSortMergeJoin()) {
         return false;
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java Thu Jan  8 02:00:11 2015
@@ -18,7 +18,16 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
-import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -35,8 +44,8 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
-import java.io.Serializable;
-import java.util.*;
+import com.google.common.base.Preconditions;
+
 
 /**
  * Do a BFS on the sparkWork graph, and look for any work that has more than one child.
@@ -154,13 +163,12 @@ public class SplitSparkWorkResolver impl
   }
 
   // we lost statistics & opTraits through cloning, try to get them back
-  // TODO: make sure this method is sufficient to solve the problem
   private void setStatistics(BaseWork origin, BaseWork clone) {
     if (origin instanceof MapWork && clone instanceof MapWork) {
       MapWork originMW = (MapWork) origin;
       MapWork cloneMW = (MapWork) clone;
-      for (Map.Entry<String, Operator<? extends OperatorDesc>> entry :
-          originMW.getAliasToWork().entrySet()) {
+      for (Map.Entry<String, Operator<? extends OperatorDesc>> entry
+        : originMW.getAliasToWork().entrySet()) {
         String alias = entry.getKey();
         Operator<? extends OperatorDesc> cloneOP = cloneMW.getAliasToWork().get(alias);
         if (cloneOP != null) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Thu Jan  8 02:00:11 2015
@@ -58,7 +58,7 @@ import java.util.Set;
  * Cloned from GenTezProcContext.
  *
  */
-public class GenSparkProcContext implements NodeProcessorCtx{
+public class GenSparkProcContext implements NodeProcessorCtx {
   public final ParseContext parseContext;
   public final HiveConf conf;
   public final List<Task<MoveWork>> moveTask;
@@ -89,11 +89,12 @@ public class GenSparkProcContext impleme
 
   // map that keeps track of the last operator of a task to the following work
   // of this operator. This is used for connecting them later.
-  public final Map<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>> leafOpToFollowingWorkInfo;
+  public final Map<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, ReduceWork>>
+    leafOpToFollowingWorkInfo;
 
   // a map that keeps track of work that need to be linked while
   // traversing an operator tree
-  public final Map<Operator<?>, Map<BaseWork,SparkEdgeProperty>> linkOpWithWorkMap;
+  public final Map<Operator<?>, Map<BaseWork, SparkEdgeProperty>> linkOpWithWorkMap;
 
   // a map to keep track of what reduce sinks have to be hooked up to
   // map join work
@@ -138,9 +139,13 @@ public class GenSparkProcContext impleme
   public final Map<String, Operator<? extends OperatorDesc>> topOps;
 
   @SuppressWarnings("unchecked")
-  public GenSparkProcContext(HiveConf conf, ParseContext parseContext,
-      List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs, Map<String, Operator<? extends OperatorDesc>> topOps) {
+  public GenSparkProcContext(HiveConf conf,
+      ParseContext parseContext,
+      List<Task<MoveWork>> moveTask,
+      List<Task<? extends Serializable>> rootTasks,
+      Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs,
+      Map<String, Operator<? extends OperatorDesc>> topOps) {
     this.conf = conf;
     this.parseContext = parseContext;
     this.moveTask = moveTask;
@@ -163,9 +168,9 @@ public class GenSparkProcContext impleme
     this.currentMapJoinOperators = new LinkedHashSet<MapJoinOperator>();
     this.linkChildOpWithDummyOp = new LinkedHashMap<Operator<?>, List<Operator<?>>>();
     this.dependencyTask = conf.getBoolVar(
-        HiveConf.ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES) ?
-        (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf) :
-        null;
+        HiveConf.ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)
+        ? (DependencyCollectionTask) TaskFactory.get(new DependencyCollectionWork(), conf)
+        : null;
     this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
     this.currentUnionOperators = new LinkedList<UnionOperator>();
     this.workWithUnionOperators = new LinkedHashSet<BaseWork>();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Thu Jan  8 02:00:11 2015
@@ -18,14 +18,21 @@
 
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
@@ -48,26 +55,16 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
 
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 /**
  * GenSparkUtils is a collection of shared helper methods to produce SparkWork
  * Cloned from GenTezUtils.
- * TODO: need to make it fit to Spark
  */
 public class GenSparkUtils {
-  private static final Log logger = LogFactory.getLog(GenSparkUtils.class.getName());
+  private static final Log LOG = LogFactory.getLog(GenSparkUtils.class.getName());
 
   // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
   private int sequenceNumber = 0;
@@ -89,12 +86,13 @@ public class GenSparkUtils {
     sequenceNumber = 0;
   }
 
-  public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) throws SemanticException {
+  public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root,
+    SparkWork sparkWork) throws SemanticException {
     Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
         "AssertionError: expected root.getParentOperators() to be non-empty");
 
-    ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
-    logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
+    ReduceWork reduceWork = new ReduceWork("Reducer " + (++sequenceNumber));
+    LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
     reduceWork.setReducer(root);
     reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
 
@@ -103,8 +101,8 @@ public class GenSparkUtils {
     // all be -1. In sort/order case where it matters there will be only
     // one parent.
     Preconditions.checkArgument(context.parentOfRoot instanceof ReduceSinkOperator,
-        "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was " +
-            context.parentOfRoot.getClass().getName());
+      "AssertionError: expected context.parentOfRoot to be an instance of ReduceSinkOperator, but was "
+      + context.parentOfRoot.getClass().getName());
     ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
 
     reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
@@ -123,7 +121,7 @@ public class GenSparkUtils {
   protected void setupReduceSink(GenSparkProcContext context, ReduceWork reduceWork,
       ReduceSinkOperator reduceSink) {
 
-    logger.debug("Setting up reduce sink: " + reduceSink
+    LOG.debug("Setting up reduce sink: " + reduceSink
         + " with following reduce work: " + reduceWork.getName());
 
     // need to fill in information about the key and value in the reducer
@@ -146,14 +144,14 @@ public class GenSparkUtils {
       SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException {
     Preconditions.checkArgument(root.getParentOperators().isEmpty(),
         "AssertionError: expected root.getParentOperators() to be empty");
-    MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
-    logger.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+    MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
+    LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
 
     // map work starts with table scan operators
     Preconditions.checkArgument(root instanceof TableScanOperator,
-        "AssertionError: expected root to be an instance of TableScanOperator, but was " +
-            root.getClass().getName());
-    String alias = ((TableScanOperator)root).getConf().getAlias();
+      "AssertionError: expected root to be an instance of TableScanOperator, but was "
+      + root.getClass().getName());
+    String alias = ((TableScanOperator) root).getConf().getAlias();
 
     if (!deferSetup) {
       setupMapWork(mapWork, context, partitions, root, alias);
@@ -174,11 +172,11 @@ public class GenSparkUtils {
         context.inputs, partitions, root, alias, context.conf, false);
   }
 
-  private void collectOperators (Operator<?> op, List<Operator<?>> opList) {
+  private void collectOperators(Operator<?> op, List<Operator<?>> opList) {
     opList.add(op);
     for (Object child : op.getChildOperators()) {
       if (child != null) {
-        collectOperators((Operator<?>)child, opList);
+        collectOperators((Operator<?>) child, opList);
       }
     }
   }
@@ -199,23 +197,23 @@ public class GenSparkUtils {
 
     // Build a map to map the original FileSinkOperator and the cloned FileSinkOperators
     // This map is used for set the stats flag for the cloned FileSinkOperators in later process
-    Iterator<Operator<?>> newRoots_it = newRoots.iterator();
+    Iterator<Operator<?>> newRootsIt = newRoots.iterator();
     for (Operator<?> root : roots) {
-      Operator<?> newRoot = newRoots_it.next();
+      Operator<?> newRoot = newRootsIt.next();
       List<Operator<?>> newOpQueue = new LinkedList<Operator<?>>();
-      collectOperators (newRoot, newOpQueue);
+      collectOperators(newRoot, newOpQueue);
       List<Operator<?>> opQueue = new LinkedList<Operator<?>>();
-      collectOperators (root, opQueue);
-      Iterator<Operator<?>> newOpQueue_it = newOpQueue.iterator();
+      collectOperators(root, opQueue);
+      Iterator<Operator<?>> newOpQueueIt = newOpQueue.iterator();
       for (Operator<?> op : opQueue) {
-        Operator<?> newOp = newOpQueue_it.next();
+        Operator<?> newOp = newOpQueueIt.next();
         if (op instanceof FileSinkOperator) {
           List<FileSinkOperator> fileSinkList = context.fileSinkMap.get(op);
           if (fileSinkList == null) {
             fileSinkList = new LinkedList<FileSinkOperator>();
           }
-          fileSinkList.add((FileSinkOperator)newOp);
-          context.fileSinkMap.put((FileSinkOperator)op, fileSinkList);
+          fileSinkList.add((FileSinkOperator) newOp);
+          context.fileSinkMap.put((FileSinkOperator) op, fileSinkList);
         }
       }
     }
@@ -234,10 +232,10 @@ public class GenSparkUtils {
     for (Operator<?> orig: roots) {
       Operator<?> newRoot = it.next();
       if (newRoot instanceof HashTableDummyOperator) {
-        dummyOps.add((HashTableDummyOperator)newRoot);
+        dummyOps.add((HashTableDummyOperator) newRoot);
         it.remove();
       } else {
-        replacementMap.put(orig,newRoot);
+        replacementMap.put(orig, newRoot);
       }
     }
 
@@ -249,7 +247,7 @@ public class GenSparkUtils {
 
     Set<Operator<?>> seen = new HashSet<Operator<?>>();
 
-    while(!operators.isEmpty()) {
+    while (!operators.isEmpty()) {
       Operator<?> current = operators.pop();
       seen.add(current);
 
@@ -314,7 +312,7 @@ public class GenSparkUtils {
     if (chDir) {
       // Merge the files in the destination table/partitions by creating Map-only merge job
       // If underlying data is RCFile a RCFileBlockMerge task would be created.
-      logger.info("using CombineHiveInputformat for the merge job");
+      LOG.info("using CombineHiveInputformat for the merge job");
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
           hconf, context.currentTask);
@@ -335,8 +333,8 @@ public class GenSparkUtils {
     String sortOrder = Strings.nullToEmpty(reduceSink.getConf().getOrder()).trim();
 
     // test if we need group-by shuffle
-    if (reduceSink.getChildOperators().size() == 1 &&
-        reduceSink.getChildOperators().get(0) instanceof GroupByOperator) {
+    if (reduceSink.getChildOperators().size() == 1
+      && reduceSink.getChildOperators().get(0) instanceof GroupByOperator) {
       edgeProperty.setShuffleGroup();
       // test if the group by needs partition level sort, if so, use the MR style shuffle
       // SHUFFLE_SORT shouldn't be used for this purpose, see HIVE-8542
@@ -364,9 +362,9 @@ public class GenSparkUtils {
     // test if we need total order, if so, we can either use MR shuffle + set #reducer to 1,
     // or we can use SHUFFLE_SORT
     if (edgeProperty.isShuffleNone() && !sortOrder.isEmpty()) {
-      if (reduceSink.getConf().getPartitionCols() == null ||
-          reduceSink.getConf().getPartitionCols().isEmpty() ||
-          isSame(reduceSink.getConf().getPartitionCols(),
+      if (reduceSink.getConf().getPartitionCols() == null
+        || reduceSink.getConf().getPartitionCols().isEmpty()
+        || isSame(reduceSink.getConf().getPartitionCols(),
               reduceSink.getConf().getKeyCols())) {
         edgeProperty.setShuffleSort();
       } else {
@@ -397,12 +395,12 @@ public class GenSparkUtils {
       return true;
     }
     List<Operator<? extends OperatorDesc>> children = reduceSinkOperator.getChildOperators();
-    if (children != null && children.size() == 1 &&
-        children.get(0) instanceof GroupByOperator) {
+    if (children != null && children.size() == 1
+      && children.get(0) instanceof GroupByOperator) {
       GroupByOperator child = (GroupByOperator) children.get(0);
       if (isSame(reduceSinkOperator.getConf().getKeyCols(),
-          reduceSinkOperator.getConf().getPartitionCols()) &&
-          reduceSinkOperator.getConf().getKeyCols().size() == child.getConf().getKeys().size()) {
+          reduceSinkOperator.getConf().getPartitionCols())
+          && reduceSinkOperator.getConf().getKeyCols().size() == child.getConf().getKeys().size()) {
         return false;
       }
     }
@@ -410,7 +408,7 @@ public class GenSparkUtils {
   }
 
   /**
-   * Test if two lists of ExprNodeDesc are semantically same
+   * Test if two lists of ExprNodeDesc are semantically same.
    */
   private static boolean isSame(List<ExprNodeDesc> list1, List<ExprNodeDesc> list2) {
     if (list1 != list2) {
@@ -430,6 +428,7 @@ public class GenSparkUtils {
     return true;
   }
 
+  @SuppressWarnings("unchecked")
   public static <T> T getChildOperator(Operator<?> op, Class<T> klazz) throws SemanticException {
     if (klazz.isInstance(op)) {
       return (T) op;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Thu Jan  8 02:00:11 2015
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -42,12 +47,7 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Stack;
+import com.google.common.base.Preconditions;
 
 /**
  * GenSparkWork separates the operator tree into spark tasks.
@@ -55,8 +55,6 @@ import java.util.Stack;
  * and break the operators into work and tasks along the way.
  *
  * Cloned from GenTezWork.
- *
- * TODO: need to go thru this to make it fit completely to Spark.
  */
 public class GenSparkWork implements NodeProcessor {
   static final private Log LOG = LogFactory.getLog(GenSparkWork.class.getName());
@@ -84,6 +82,7 @@ public class GenSparkWork implements Nod
         "AssertionError: expected context.currentRootOperator to be not null");
 
     // Operator is a file sink or reduce sink. Something that forces a new vertex.
+    @SuppressWarnings("unchecked")
     Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>) nd;
 
     // root is the start of the operator pipeline we're currently
@@ -122,7 +121,8 @@ public class GenSparkWork implements Nod
         if (smbOp != null) {
           // This logic is for SortMergeBucket MapJoin case.
           // This MapWork (of big-table, see above..) is later initialized by SparkMapJoinFactory
-          // processor, so don't initialize it here. Just keep track of it in the context, for later processing.
+          // processor, so don't initialize it here. Just keep track of it in the context,
+          // for later processing.
           work = utils.createMapWork(context, root, sparkWork, null, true);
           if (context.smbJoinWorkMap.get(smbOp) != null) {
             throw new SemanticException("Each SMBMapJoin should be associated only with one Mapwork");
@@ -182,9 +182,9 @@ public class GenSparkWork implements Nod
                 work.addDummyOp((HashTableDummyOperator) dummy);
               }
             }
-            for (Entry<BaseWork,SparkEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
+            for (Entry<BaseWork, SparkEdgeProperty> parentWorkMap : linkWorkMap.entrySet()) {
               BaseWork parentWork = parentWorkMap.getKey();
-              LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
+              LOG.debug("connecting " + parentWork.getName() + " with " + work.getName());
               SparkEdgeProperty edgeProp = parentWorkMap.getValue();
               sparkWork.connect(parentWork, work, edgeProp);
 
@@ -218,7 +218,8 @@ public class GenSparkWork implements Nod
       ReduceWork reduceWork = (ReduceWork) work;
       for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) {
         Preconditions.checkArgument(parent instanceof ReduceSinkOperator,
-            "AssertionError: expected operator to be a ReduceSinkOperator, but was " + parent.getClass().getName());
+          "AssertionError: expected operator to be a ReduceSinkOperator, but was "
+          + parent.getClass().getName());
         ReduceSinkOperator rsOp = (ReduceSinkOperator) parent;
         SparkEdgeProperty edgeProp = GenSparkUtils.getEdgeProperty(rsOp, reduceWork);
 
@@ -251,7 +252,8 @@ public class GenSparkWork implements Nod
     // Also note: the concept of leaf and root is reversed in hive for historical
     // reasons. Roots are data sources, leaves are data sinks. I know.
     if (context.leafOpToFollowingWorkInfo.containsKey(operator)) {
-      ObjectPair<SparkEdgeProperty, ReduceWork> childWorkInfo = context.leafOpToFollowingWorkInfo.get(operator);
+      ObjectPair<SparkEdgeProperty, ReduceWork> childWorkInfo = context.
+        leafOpToFollowingWorkInfo.get(operator);
       SparkEdgeProperty edgeProp = childWorkInfo.getFirst();
       ReduceWork childWork = childWorkInfo.getSecond();
 
@@ -286,8 +288,8 @@ public class GenSparkWork implements Nod
     // the next item will be a new root.
     if (!operator.getChildOperators().isEmpty()) {
       Preconditions.checkArgument(operator.getChildOperators().size() == 1,
-          "AssertionError: expected operator.getChildOperators().size() to be 1, but was " +
-              operator.getChildOperators().size());
+        "AssertionError: expected operator.getChildOperators().size() to be 1, but was "
+        + operator.getChildOperators().size());
       context.parentOfRoot = operator;
       context.currentRootOperator = operator.getChildOperators().get(0);
       context.preceedingWork = work;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java Thu Jan  8 02:00:11 2015
@@ -50,6 +50,7 @@ public class GenSparkWorkWalker extends
     this.ctx = ctx;
   }
 
+  @SuppressWarnings("unchecked")
   private void setRoot(Node nd) {
     ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
     ctx.preceedingWork = null;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Thu Jan  8 02:00:11 2015
@@ -87,14 +87,12 @@ import org.apache.hadoop.hive.ql.session
 /**
  * SparkCompiler translates the operator plan into SparkTasks.
  *
- * Pretty much cloned from TezCompiler.
- *
- * TODO: need to complete and make it fit to Spark.
+ * Cloned from TezCompiler.
  */
 public class SparkCompiler extends TaskCompiler {
   private static final String CLASS_NAME = SparkCompiler.class.getName();
-  private static final PerfLogger perfLogger = PerfLogger.getPerfLogger();
-  private static final Log logger = LogFactory.getLog(SparkCompiler.class);
+  private static final PerfLogger PERF_LOGGER = PerfLogger.getPerfLogger();
+  private static final Log LOGGER = LogFactory.getLog(SparkCompiler.class);
 
   public SparkCompiler() {
   }
@@ -102,16 +100,12 @@ public class SparkCompiler extends TaskC
   @Override
   public void init(HiveConf conf, LogHelper console, Hive db) {
     super.init(conf, console, db);
-
-//    TODO: Need to check if we require the use of recursive input dirs for union processing
-//    conf.setBoolean("mapred.input.dir.recursive", true);
-//    HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
   }
 
   @Override
   protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
       Set<WriteEntity> outputs) throws SemanticException {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
+    PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
     // Sequence of TableScan operators to be walked
     Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>();
     deque.addAll(pCtx.getTopOps().values());
@@ -137,7 +131,7 @@ public class SparkCompiler extends TaskC
     ArrayList<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pCtx.getTopOps().values());
     ogw.startWalking(topNodes, null);
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
+    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
   }
 
   /**
@@ -147,7 +141,7 @@ public class SparkCompiler extends TaskC
   protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
       List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs)
       throws SemanticException {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
+    PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
     GenSparkUtils.getUtils().resetSequenceNumber();
 
     ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
@@ -244,7 +238,7 @@ public class SparkCompiler extends TaskC
       GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);
     }
 
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
+    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE);
   }
 
   @Override
@@ -301,7 +295,7 @@ public class SparkCompiler extends TaskC
   @Override
   protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx,
       Context ctx) throws SemanticException {
-    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
+    PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
     PhysicalContext physicalCtx = new PhysicalContext(conf, pCtx, pCtx.getContext(), rootTasks,
        pCtx.getFetchTask());
 
@@ -345,7 +339,7 @@ public class SparkCompiler extends TaskC
       LOG.debug("Skipping stage id rearranger");
     }
 
-    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
+    PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
     return;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java Thu Jan  8 02:00:11 2015
@@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
- * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
- * Cloned from tez's FileSinkProcessor
+ * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks.
+ * Cloned from tez's FileSinkProcessor.
  */
 public class SparkFileSinkProcessor implements NodeProcessor {
-  private static final Log logger = LogFactory.getLog(SparkFileSinkProcessor.class.getName());
+  private static final Log LOGGER = LogFactory.getLog(SparkFileSinkProcessor.class.getName());
 
   /*
    * (non-Javadoc)

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Thu Jan  8 02:00:11 2015
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import java.lang.StringBuffer;
 import java.util.List;
 import java.util.Set;
 import java.util.Stack;
@@ -57,16 +56,16 @@ import com.google.common.base.Preconditi
  * (normal, no scan, partial scan.) The plan at this point will be a single
  * table scan operator.
  *
- * TODO: cloned from tez ProcessAnalyzeTable. Need to make sure it fits to Spark.
+ * Cloned from Tez ProcessAnalyzeTable.
  */
 public class SparkProcessAnalyzeTable implements NodeProcessor {
-  private static final Log logger = LogFactory.getLog(SparkProcessAnalyzeTable.class.getName());
+  private static final Log LOGGER = LogFactory.getLog(SparkProcessAnalyzeTable.class.getName());
 
   // shared plan utils for spark
   private GenSparkUtils utils = null;
 
   /**
-   * Injecting the utils in the constructor facilitates testing
+   * Injecting the utils in the constructor facilitates testing.
    */
   public SparkProcessAnalyzeTable(GenSparkUtils utils) {
     this.utils = utils;
@@ -81,16 +80,18 @@ public class SparkProcessAnalyzeTable im
     TableScanOperator tableScan = (TableScanOperator) nd;
 
     ParseContext parseContext = context.parseContext;
+
+    @SuppressWarnings("rawtypes")
     Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
         .getInputFormatClass();
     QB queryBlock = parseContext.getQB();
     QBParseInfo parseInfo = parseContext.getQB().getParseInfo();
 
     if (parseInfo.isAnalyzeCommand()) {
-      Preconditions.checkArgument(tableScan.getChildOperators() == null ||
-        tableScan.getChildOperators().size() == 0,
-          "AssertionError: expected tableScan.getChildOperators() to be null, " +
-            "or tableScan.getChildOperators().size() to be 0");
+      Preconditions.checkArgument(tableScan.getChildOperators() == null
+        || tableScan.getChildOperators().size() == 0,
+        "AssertionError: expected tableScan.getChildOperators() to be null, "
+        + "or tableScan.getChildOperators().size() to be 0");
 
       String alias = null;
       for (String a: parseContext.getTopOps().keySet()) {
@@ -185,6 +186,8 @@ public class SparkProcessAnalyzeTable im
 
     // partial scan task
     DriverContext driverCxt = new DriverContext();
+
+    @SuppressWarnings("unchecked")
     Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf());
     partialScanTask.initialize(parseContext.getConf(), null, driverCxt);
     partialScanTask.setWork(scanWork);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Thu Jan  8 02:00:11 2015
@@ -21,10 +21,10 @@ package org.apache.hadoop.hive.ql.plan;
 
 @Explain(displayName = "Edge Property")
 public class SparkEdgeProperty {
-  public static long SHUFFLE_NONE = 0; // No shuffle is needed. For union only.
-  public static long SHUFFLE_GROUP = 1; // HashPartition shuffle, keys are not sorted in any way.
-  public static long SHUFFLE_SORT = 2;  // RangePartition shuffle, keys are total sorted.
-  public static long MR_SHUFFLE_SORT = 4; // HashPartition shuffle, keys are sorted by partition.
+  public static final long SHUFFLE_NONE = 0; // No shuffle is needed. For union only.
+  public static final long SHUFFLE_GROUP = 1; // HashPartition shuffle, keys are not sorted in any way.
+  public static final long SHUFFLE_SORT = 2;  // RangePartition shuffle, keys are total sorted.
+  public static final long MR_SHUFFLE_SORT = 4; // HashPartition shuffle, keys are sorted by partition.
 
   private long edgeType;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Thu Jan  8 02:00:11 2015
@@ -73,7 +73,7 @@ public class SparkWork extends AbstractO
   }
 
   /**
-   * getWorkMap returns a map of "vertex name" to BaseWork
+   * @return a map of "vertex name" to BaseWork
    */
   @Explain(displayName = "Vertices")
   public Map<String, BaseWork> getWorkMap() {
@@ -85,7 +85,7 @@ public class SparkWork extends AbstractO
   }
 
   /**
-   * getAllWork returns a topologically sorted list of BaseWork
+   * @return a topologically sorted list of BaseWork
    */
   public List<BaseWork> getAllWork() {
 
@@ -122,7 +122,7 @@ public class SparkWork extends AbstractO
   }
 
   /**
-   * add all nodes in the collection without any connections
+   * Add all nodes in the collection without any connections.
    */
   public void addAll(Collection<BaseWork> c) {
     for (BaseWork w: c) {
@@ -131,7 +131,7 @@ public class SparkWork extends AbstractO
   }
 
   /**
-   * add all nodes in the collection without any connections
+   * Add all nodes in the collection without any connections.
    */
   public void addAll(BaseWork[] bws) {
     for (BaseWork w: bws) {
@@ -260,7 +260,7 @@ public class SparkWork extends AbstractO
    * returns the edge type connecting work a and b
    */
   public SparkEdgeProperty getEdgeProperty(BaseWork a, BaseWork b) {
-    return edgeProperties.get(new ImmutablePair<BaseWork, BaseWork>(a,b));
+    return edgeProperties.get(new ImmutablePair<BaseWork, BaseWork>(a, b));
   }
 
   /**
@@ -330,7 +330,9 @@ public class SparkWork extends AbstractO
      */
     boolean dependsOn(String n1, String n2) {
       for (String p = dependencies.get(n1); p != null; p = dependencies.get(p)) {
-        if (p.equals(n2)) return true;
+        if (p.equals(n2)) {
+          return true;
+        }
       }
       return false;
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java Thu Jan  8 02:00:11 2015
@@ -31,9 +31,10 @@ public class CounterStatsAggregatorSpark
 
   private SparkCounters sparkCounters;
 
+  @SuppressWarnings("rawtypes")
   @Override
   public boolean connect(Configuration hconf, Task sourceTask) {
-    SparkTask task = (SparkTask)sourceTask;
+    SparkTask task = (SparkTask) sourceTask;
     sparkCounters = task.getSparkCounters();
     if (sparkCounters == null) {
       return false;

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Thu Jan  8 02:00:11 2015
@@ -19,12 +19,12 @@ package org.apache.hive.spark.client;
 
 import java.io.Serializable;
 
-import com.google.common.base.Throwables;
-
 import org.apache.hive.spark.client.metrics.Metrics;
 import org.apache.hive.spark.client.rpc.RpcDispatcher;
 import org.apache.hive.spark.counter.SparkCounters;
 
+import com.google.common.base.Throwables;
+
 abstract class BaseProtocol extends RpcDispatcher {
 
   protected static class CancelJob implements Serializable {
@@ -118,7 +118,7 @@ abstract class BaseProtocol extends RpcD
   }
 
   /**
-   * Inform the client that a new spark job has been submitted for the client job
+   * Inform the client that a new spark job has been submitted for the client job.
    */
   protected static class JobSubmitted implements Serializable {
     final String clientJobId;

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java Thu Jan  8 02:00:11 2015
@@ -17,10 +17,10 @@
 
 package org.apache.hive.spark.client;
 
-import java.io.Serializable;
-
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 
+import java.io.Serializable;
+
 /**
  * Interface for a Spark remote job.
  */

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java Thu Jan  8 02:00:11 2015
@@ -23,6 +23,7 @@ import java.util.Set;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hive.spark.counter.SparkCounters;
+
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -48,7 +49,7 @@ public interface JobContext {
     JavaFutureAction<T> job, SparkCounters sparkCounters, Set<Integer> cachedRDDIds);
 
   /**
-   * Return a map from client job Id to corresponding JavaFutureActions
+   * Return a map from client job Id to corresponding JavaFutureActions.
    */
   Map<String, List<JavaFutureAction<?>>> getMonitoredJobs();
 

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java Thu Jan  8 02:00:11 2015
@@ -23,6 +23,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hive.spark.counter.SparkCounters;
+
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaSparkContext;
 

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Thu Jan  8 02:00:11 2015
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+
 import org.apache.hive.spark.counter.SparkCounters;
 
 /**
@@ -45,12 +46,12 @@ public interface JobHandle<T extends Ser
   MetricsCollection getMetrics();
 
   /**
-   * Get corresponding spark job IDs for this job
+   * Get corresponding spark job IDs for this job.
    */
   List<Integer> getSparkJobIds();
 
   /**
-   * Get the SparkCounters for this job
+   * Get the SparkCounters for this job.
    */
   SparkCounters getSparkCounters();
 

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Thu Jan  8 02:00:11 2015
@@ -17,16 +17,16 @@
 
 package org.apache.hive.spark.client;
 
+import io.netty.util.concurrent.Promise;
+
 import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import io.netty.util.concurrent.Promise;
-
 import org.apache.hive.spark.counter.SparkCounters;
 
 /**

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java Thu Jan  8 02:00:11 2015
@@ -24,13 +24,6 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hive.spark.client.metrics.DataReadMethod;
 import org.apache.hive.spark.client.metrics.InputMetrics;
@@ -38,6 +31,13 @@ import org.apache.hive.spark.client.metr
 import org.apache.hive.spark.client.metrics.ShuffleReadMetrics;
 import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 /**
  * Provides metrics collected for a submitted job.
  *
@@ -162,7 +162,6 @@ public class MetricsCollection {
       long remoteBytesRead = 0L;
 
       // Shuffle write metrics.
-      boolean hasShuffleWriteMetrics = false;
       long shuffleBytesWritten = 0L;
       long shuffleWriteTime = 0L;
 
@@ -195,7 +194,6 @@ public class MetricsCollection {
         }
 
         if (m.shuffleWriteMetrics != null) {
-          hasShuffleWriteMetrics = true;
           shuffleBytesWritten += m.shuffleWriteMetrics.shuffleBytesWritten;
           shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime;
         }

Modified: hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java (original)
+++ hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java Thu Jan  8 02:00:11 2015
@@ -20,6 +20,7 @@ package org.apache.hive.spark.client;
 import java.util.Set;
 
 import org.apache.hive.spark.counter.SparkCounters;
+
 import org.apache.spark.api.java.JavaFutureAction;
 
 interface MonitorCallback {