You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/08 03:00:12 UTC

svn commit: r1650201 [1/3] - in /hive/branches/spark: itests/hive-unit/src/test/java/org/apache/hive/jdbc/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/se...

Author: szehon
Date: Thu Jan  8 02:00:11 2015
New Revision: 1650201

URL: http://svn.apache.org/r1650201
Log:
HIVE-9281 : Code cleanup [Spark Branch] (Szehon, reviewed by Xuefu)

Modified:
    hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
    hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestKryoMessageCodec.java
    hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java

Modified: hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java (original)
+++ hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java Thu Jan  8 02:00:11 2015
@@ -124,7 +124,7 @@ public class TestJdbcWithLocalClusterSpa
   }
 
   /**
-   * Verify that the connection to HS2 with MiniMr is successful
+   * Verify that the connection to HS2 with MiniMr is successful.
    * @throws Exception
    */
   @Test
@@ -134,7 +134,7 @@ public class TestJdbcWithLocalClusterSpa
   }
 
   /**
-   * Run nonMr query
+   * Run nonMr query.
    * @throws Exception
    */
   @Test
@@ -147,15 +147,15 @@ public class TestJdbcWithLocalClusterSpa
   }
 
   /**
-   * Run nonMr query
+   * Run nonMr query.
    * @throws Exception
    */
   @Test
   public void testSparkQuery() throws Exception {
     String tableName = "testTab2";
     String resultVal = "val_238";
-    String queryStr = "SELECT * FROM " + tableName +
-        " where value = '" + resultVal + "'";
+    String queryStr = "SELECT * FROM " + tableName
+        + " where value = '" + resultVal + "'";
 
     testKvQuery(tableName, queryStr, resultVal);
   }
@@ -233,8 +233,8 @@ public class TestJdbcWithLocalClusterSpa
         + dataFilePath.toString() + "' into table " + tempTableName);
 
     String resultVal = "val_238";
-    String queryStr = "SELECT * FROM " + tempTableName +
-        " where value = '" + resultVal + "'";
+    String queryStr = "SELECT * FROM " + tempTableName
+        + " where value = '" + resultVal + "'";
     verifyResult(queryStr, resultVal, 2);
 
     // A second connection should not be able to see the table
@@ -244,8 +244,7 @@ public class TestJdbcWithLocalClusterSpa
     stmt2.execute("USE " + dbName);
     boolean gotException = false;
     try {
-      ResultSet res;
-      res = stmt2.executeQuery(queryStr);
+      stmt2.executeQuery(queryStr);
     } catch (SQLException err) {
       // This is expected to fail.
       assertTrue("Expecting table not found error, instead got: " + err,
@@ -266,7 +265,7 @@ public class TestJdbcWithLocalClusterSpa
   }
 
   /**
-   * Verify if the given property contains the expected value
+   * Verify if the given property contains the expected value.
    * @param propertyName
    * @param expectedValue
    * @throws Exception
@@ -275,7 +274,7 @@ public class TestJdbcWithLocalClusterSpa
     Statement stmt = hs2Conn .createStatement();
     ResultSet res = stmt.executeQuery("set " + propertyName);
     assertTrue(res.next());
-    String results[] = res.getString(1).split("=");
+    String[] results = res.getString(1).split("=");
     assertEquals("Property should be set", results.length, 2);
     assertEquals("Property should be set", expectedValue, results[1]);
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Thu Jan  8 02:00:11 2015
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.serde2.obj
 
 public class SparkHashTableSinkOperator
     extends TerminalOperator<SparkHashTableSinkDesc> implements Serializable {
+  private static final int MIN_REPLICATION = 10;
   private static final long serialVersionUID = 1L;
   private final String CLASS_NAME = this.getClass().getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
@@ -122,7 +123,6 @@ public class SparkHashTableSinkOperator
         + "-" + Math.abs(Utilities.randGen.nextInt()));
       try {
         // This will guarantee file name uniqueness.
-        // TODO: can we use the task id, which should be unique
         if (fs.createNewFile(path)) {
           break;
         }
@@ -131,10 +131,10 @@ public class SparkHashTableSinkOperator
       }
       // TODO find out numOfPartitions for the big table
       int numOfPartitions = replication;
-      replication = (short)Math.min(10, numOfPartitions);
+      replication = (short) Math.min(MIN_REPLICATION, numOfPartitions);
     }
-    htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag +
-      " with group count: " + tableContainer.size() + " into file: " + path);
+    htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
+      + " with group count: " + tableContainer.size() + " into file: " + path);
     // get the hashtable file and path
     // get the hashtable file and path
     OutputStream os = null;
@@ -153,8 +153,8 @@ public class SparkHashTableSinkOperator
     }
     tableContainer.clear();
     FileStatus status = fs.getFileStatus(path);
-    htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path +
-      " (" + status.getLen() + " bytes)");
+    htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
+      + " (" + status.getLen() + " bytes)");
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Thu Jan  8 02:00:11 2015
@@ -101,7 +101,7 @@ public class HashTableLoader implements
             bigInputPath = null;
           } else {
             Set<String> aliases =
-              ((SparkBucketMapJoinContext)mapJoinCtx).getPosToAliasMap().get(pos);
+              ((SparkBucketMapJoinContext) mapJoinCtx).getPosToAliasMap().get(pos);
             String alias = aliases.iterator().next();
             // Any one small table input path
             String smallInputPath =
@@ -110,7 +110,7 @@ public class HashTableLoader implements
           }
         }
         String fileName = localWork.getBucketFileName(bigInputPath);
-        Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName);
+        Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte) pos, fileName);
         LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
         mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path);
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Thu Jan  8 02:00:11 2015
@@ -70,13 +70,15 @@ public abstract class HiveBaseFunctionRe
   /** Process the given record. */
   protected abstract void processNextRecord(T inputRecord) throws IOException;
 
-  /** Is the current state of the record processor done? */
+  /**
+   * @return true if current state of the record processor is done.
+   */
   protected abstract boolean processingDone();
 
-  /** Close the record processor */
+  /** Close the record processor. */
   protected abstract void closeRecordProcessor();
 
-  /** Implement Iterator interface */
+  /** Implement Iterator interface. */
   public class ResultIterator implements Iterator {
     @Override
     public boolean hasNext(){
@@ -98,8 +100,7 @@ public abstract class HiveBaseFunctionRe
             return true;
           }
         } catch (IOException ex) {
-          // TODO: better handling of exception.
-          throw new RuntimeException("Error while processing input.", ex);
+          throw new IllegalStateException("Error while processing input.", ex);
         }
       }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java Thu Jan  8 02:00:11 2015
@@ -79,7 +79,7 @@ public class HiveKVResultCache {
 
       container.setSerDe(serDe, oi);
       container.setTableDesc(tableDesc);
-    } catch(Exception ex) {
+    } catch (Exception ex) {
       throw new RuntimeException("Failed to create RowContainer", ex);
     }
     return container;
@@ -114,7 +114,7 @@ public class HiveKVResultCache {
     }
     try {
       container.clearRows();
-    } catch(HiveException ex) {
+    } catch (HiveException ex) {
       throw new RuntimeException("Failed to clear rows in RowContainer", ex);
     }
     cursor = 0;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Thu Jan  8 02:00:11 2015
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.io.BytesWritable;
+
 import scala.Tuple2;
 
 import java.util.Iterator;
@@ -35,6 +36,7 @@ public class HiveMapFunction extends Hiv
     super(jobConfBuffer, sparkReporter);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public Iterable<Tuple2<HiveKey, BytesWritable>>
   call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
@@ -50,7 +52,6 @@ public class HiveMapFunction extends Hiv
     }
 
     HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler);
-    //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709.
     mapRecordHandler.init(jobConf, result, sparkReporter);
 
     return result;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Thu Jan  8 02:00:11 2015
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.s
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.Reporter;
 import scala.Tuple2;
 
 import java.io.IOException;
@@ -32,6 +31,7 @@ public class HiveMapFunctionResultList e
   /**
    * Instantiate result set Iterable for Map function output.
    *
+   * @param conf Hive configuration.
    * @param inputIterator Input record iterator.
    * @param handler Initialized {@link SparkMapRecordHandler} instance.
    */

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java Thu Jan  8 02:00:11 2015
@@ -26,13 +26,13 @@ import org.apache.spark.api.java.functio
 
 
 public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
-  private static final NumberFormat taskIdFormat = NumberFormat.getInstance();
-  private static final NumberFormat stageIdFormat = NumberFormat.getInstance();
+  private static final NumberFormat TASK_ID_FORMAT = NumberFormat.getInstance();
+  private static final NumberFormat STAGE_ID_FORMAT = NumberFormat.getInstance();
   static {
-    taskIdFormat.setGroupingUsed(false);
-    taskIdFormat.setMinimumIntegerDigits(6);
-    stageIdFormat.setGroupingUsed(false);
-    stageIdFormat.setMinimumIntegerDigits(4);
+    TASK_ID_FORMAT.setGroupingUsed(false);
+    TASK_ID_FORMAT.setMinimumIntegerDigits(6);
+    STAGE_ID_FORMAT.setGroupingUsed(false);
+    STAGE_ID_FORMAT.setMinimumIntegerDigits(4);
   }
 
   protected transient JobConf jobConf;
@@ -60,7 +60,7 @@ public abstract class HivePairFlatMapFun
     StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
     taskAttemptIdBuilder.append(System.currentTimeMillis())
       .append("_")
-      .append(stageIdFormat.format(TaskContext.get().stageId()))
+      .append(STAGE_ID_FORMAT.format(TaskContext.get().stageId()))
       .append("_");
 
     if (isMap()) {
@@ -71,7 +71,7 @@ public abstract class HivePairFlatMapFun
 
     // Spark task attempt id is increased by Spark context instead of task, which may introduce
     // unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
-    taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId()))
+    taskAttemptIdBuilder.append(TASK_ID_FORMAT.format(TaskContext.get().partitionId()))
       .append("_0");
 
     String taskAttemptIdStr = taskAttemptIdBuilder.toString();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Thu Jan  8 02:00:11 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.s
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
+
 import scala.Tuple2;
 
 import java.util.Iterator;
@@ -33,6 +34,7 @@ public class HiveReduceFunction extends
     super(buffer, sparkReporter);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public Iterable<Tuple2<HiveKey, BytesWritable>>
   call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Thu Jan  8 02:00:11 2015
@@ -32,6 +32,7 @@ public class HiveReduceFunctionResultLis
   /**
    * Instantiate result set Iterable for Reduce function output.
    *
+   * @param conf Hive configuration.
    * @param inputIterator Input record iterator.
    * @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance.
    */

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java Thu Jan  8 02:00:11 2015
@@ -34,12 +34,15 @@ public interface HiveSparkClient extends
    * @return SparkJobRef could be used to track spark job progress and metrics.
    * @throws Exception
    */
-  public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+  SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
 
-  public SparkConf getSparkConf();
+  /**
+   * @return spark configuration
+   */
+  SparkConf getSparkConf();
 
   /**
-   * Get the count of executors
+   * @return the number of executors
    */
-  public int getExecutorCount() throws Exception;
+  int getExecutorCount() throws Exception;
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Thu Jan  8 02:00:11 2015
@@ -33,7 +33,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
 
 public class HiveSparkClientFactory {
-  protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
+  protected static final transient Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
 
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
   private static final String SPARK_DEFAULT_MASTER = "local";

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java Thu Jan  8 02:00:11 2015
@@ -27,10 +27,8 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.mapred.JobConf;
 
-import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Thu Jan  8 02:00:11 2015
@@ -56,7 +56,7 @@ public class LocalHiveSparkClient implem
   private static final long serialVersionUID = 1L;
 
   private static final String MR_JAR_PROPERTY = "tmpjars";
-  protected static transient final Log LOG = LogFactory
+  protected static final transient Log LOG = LogFactory
       .getLog(LocalHiveSparkClient.class);
 
   private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
@@ -138,7 +138,7 @@ public class LocalHiveSparkClient implem
    * At this point single SparkContext is used by more than one thread, so make this
    * method synchronized.
    *
-   * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+   * This method can't remove a jar/resource from SparkContext. Looks like this is an
    * issue we have to live with until multiple SparkContexts are supported in a single JVM.
    */
   private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Thu Jan  8 02:00:11 2015
@@ -64,10 +64,10 @@ public class RemoteHiveSparkClient imple
   private static final long serialVersionUID = 1L;
 
   private static final String MR_JAR_PROPERTY = "tmpjars";
-  protected static transient final Log LOG = LogFactory
+  protected static final transient Log LOG = LogFactory
     .getLog(RemoteHiveSparkClient.class);
 
-  private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+  private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
 
   private transient SparkClient remoteClient;
   private transient SparkConf sparkConf;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Thu Jan  8 02:00:11 2015
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -58,16 +59,16 @@ import java.util.List;
 public class SparkMapRecordHandler extends SparkRecordHandler {
   private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
-  public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
+  public static final Log LOG = LogFactory.getLog(SparkMapRecordHandler.class);
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
   private ExecMapperContext execContext;
 
-  public void init(JobConf job, OutputCollector output, Reporter reporter) {
+  public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
     super.init(job, output, reporter);
 
-    isLogInfoEnabled = l4j.isInfoEnabled();
+    isLogInfoEnabled = LOG.isInfoEnabled();
     ObjectCache cache = ObjectCacheFactory.getCache(job);
 
     try {
@@ -90,7 +91,7 @@ public class SparkMapRecordHandler exten
 
       // initialize map operator
       mo.setChildren(job);
-      l4j.info(mo.dump(0));
+      LOG.info(mo.dump(0));
       // initialize map local work
       localWork = mrwork.getMapRedLocalWork();
       execContext.setLocalWork(localWork);
@@ -111,11 +112,11 @@ public class SparkMapRecordHandler exten
 
       //The following code is for mapjoin
       //initialize all the dummy ops
-      l4j.info("Initializing dummy operator");
+      LOG.info("Initializing dummy operator");
       List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
-      for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+      for (Operator<? extends OperatorDesc> dummyOp : dummyOps) {
         dummyOp.setExecContext(execContext);
-        dummyOp.initialize(jc,null);
+        dummyOp.initialize(jc, null);
       }
     } catch (Throwable e) {
       abort = true;
@@ -148,14 +149,14 @@ public class SparkMapRecordHandler exten
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else {
-        l4j.fatal(StringUtils.stringifyException(e));
+        LOG.fatal(StringUtils.stringifyException(e));
         throw new RuntimeException(e);
       }
     }
   }
 
   @Override
-  public void processRow(Object key, Iterator values) throws IOException {
+  public <E> void processRow(Object key, Iterator<E> values) throws IOException {
     throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler.");
   }
 
@@ -163,7 +164,7 @@ public class SparkMapRecordHandler exten
   public void close() {
     // No row was processed
     if (oc == null) {
-      l4j.trace("Close called. no row processed by map.");
+      LOG.trace("Close called. no row processed by map.");
     }
 
     // check if there are IOExceptions
@@ -177,10 +178,10 @@ public class SparkMapRecordHandler exten
       mo.close(abort);
 
       //for close the local work
-      if(localWork != null){
+      if (localWork != null) {
         List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
 
-        for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+        for (Operator<? extends OperatorDesc> dummyOp : dummyOps) {
           dummyOp.close(abort);
         }
       }
@@ -195,8 +196,8 @@ public class SparkMapRecordHandler exten
     } catch (Exception e) {
       if (!abort) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
-        throw new RuntimeException("Hive Runtime Error while closing operators", e);
+        LOG.error("Hit error while closing operators - failing tree");
+        throw new IllegalStateException("Error while closing operators", e);
       }
     } finally {
       MapredContext.close();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java Thu Jan  8 02:00:11 2015
@@ -18,24 +18,29 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.IOException;
+import java.util.Iterator;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.*;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.util.Iterator;
+import com.google.common.base.Preconditions;
 
 /**
- * Copied from MergeFileMapper
+ * Copied from MergeFileMapper.
  *
  * As MergeFileMapper is very similar to ExecMapper, this class is
  * very similar to SparkMapRecordHandler
@@ -43,13 +48,14 @@ import java.util.Iterator;
 public class SparkMergeFileRecordHandler extends SparkRecordHandler {
 
   private static final String PLAN_KEY = "__MAP_PLAN__";
-  private static final Log l4j = LogFactory.getLog(SparkMergeFileRecordHandler.class);
+  private static final Log LOG = LogFactory.getLog(SparkMergeFileRecordHandler.class);
   private Operator<? extends OperatorDesc> op;
-  private AbstractFileMergeOperator mergeOp;
+  private AbstractFileMergeOperator<? extends FileMergeDesc> mergeOp;
   private Object[] row;
 
+  @SuppressWarnings("unchecked")
   @Override
-  public void init(JobConf job, OutputCollector output, Reporter reporter) {
+  public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
     super.init(job, output, reporter);
 
     ObjectCache cache = ObjectCacheFactory.getCache(job);
@@ -70,22 +76,22 @@ public class SparkMergeFileRecordHandler
         String alias = mergeFileWork.getAliasToWork().keySet().iterator().next();
         op = mergeFileWork.getAliasToWork().get(alias);
         if (op instanceof AbstractFileMergeOperator) {
-          mergeOp = (AbstractFileMergeOperator) op;
+          mergeOp = (AbstractFileMergeOperator<? extends FileMergeDesc>) op;
           mergeOp.initializeOp(jc);
           row = new Object[2];
           abort = false;
         } else {
           abort = true;
-          throw new RuntimeException(
-              "Merge file work's top operator should be an" +
-                  " instance of AbstractFileMergeOperator");
+          throw new IllegalStateException(
+              "Merge file work's top operator should be an"
+                + " instance of AbstractFileMergeOperator");
         }
       } else {
         abort = true;
-        throw new RuntimeException("Map work should be a merge file work.");
+        throw new IllegalStateException("Map work should be a merge file work.");
       }
 
-      l4j.info(mergeOp.dump(0));
+      LOG.info(mergeOp.dump(0));
     } catch (HiveException e) {
       abort = true;
       throw new RuntimeException(e);
@@ -105,14 +111,14 @@ public class SparkMergeFileRecordHandler
   }
 
   @Override
-  public void processRow(Object key, Iterator values) throws IOException {
+  public <E> void processRow(Object key, Iterator<E> values) throws IOException {
     throw new UnsupportedOperationException("Do not support this method in "
         + this.getClass().getSimpleName());
   }
 
   @Override
   public void close() {
-    l4j.info("Closing Merge Operator " + mergeOp.getName());
+    LOG.info("Closing Merge Operator " + mergeOp.getName());
     try {
       mergeOp.closeOp(abort);
     } catch (HiveException e) {
@@ -121,7 +127,7 @@ public class SparkMergeFileRecordHandler
   }
 
   @Override
-  public  boolean getDone() {
+  public boolean getDone() {
     return mergeOp.getDone();
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Thu Jan  8 02:00:11 2015
@@ -33,8 +33,9 @@ import org.apache.spark.api.java.JavaPai
 
 import com.google.common.base.Preconditions;
 
+@SuppressWarnings("rawtypes")
 public class SparkPlan {
-  private final String CLASS_NAME = SparkPlan.class.getName();
+  private static final String CLASS_NAME = SparkPlan.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
 
   private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
@@ -43,7 +44,8 @@ public class SparkPlan {
   private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
   private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
 
-  public JavaPairRDD<HiveKey, BytesWritable> generateGraph() throws IllegalStateException {
+  @SuppressWarnings("unchecked")
+  public JavaPairRDD<HiveKey, BytesWritable> generateGraph() {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
     Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
         = new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
@@ -98,7 +100,7 @@ public class SparkPlan {
   }
 
   /**
-   * This method returns a topologically sorted list of SparkTran
+   * This method returns a topologically sorted list of SparkTran.
    */
   private List<SparkTran> getAllTrans() {
     List<SparkTran> result = new LinkedList<SparkTran>();
@@ -135,7 +137,7 @@ public class SparkPlan {
    * @param parent
    * @param child
    */
-  public void connect(SparkTran parent, SparkTran child) throws IllegalStateException {
+  public void connect(SparkTran parent, SparkTran child) {
     if (getChildren(parent).contains(child)) {
       throw new IllegalStateException("Connection already exists");
     }
@@ -151,7 +153,7 @@ public class SparkPlan {
     invertedTransGraph.get(child).add(parent);
   }
 
-  public List<SparkTran> getParents(SparkTran tran) throws IllegalStateException {
+  public List<SparkTran> getParents(SparkTran tran) {
     if (!invertedTransGraph.containsKey(tran)) {
       return new ArrayList<SparkTran>();
     }
@@ -159,7 +161,7 @@ public class SparkPlan {
     return invertedTransGraph.get(tran);
   }
 
-  public List<SparkTran> getChildren(SparkTran tran) throws IllegalStateException {
+  public List<SparkTran> getChildren(SparkTran tran) {
     if (!transGraph.containsKey(tran)) {
       return new ArrayList<SparkTran>();
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Jan  8 02:00:11 2015
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -53,9 +54,9 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-
+@SuppressWarnings("rawtypes")
 public class SparkPlanGenerator {
-  private final String CLASS_NAME = SparkPlanGenerator.class.getName();
+  private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
 
@@ -131,8 +132,8 @@ public class SparkPlanGenerator {
         sparkPlan.connect(workToTranMap.get(parentWork), result);
       }
     } else {
-      throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, " +
-          "but found " + work.getClass().getName());
+      throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, "
+        + "but found " + work.getClass().getName());
     }
 
     if (cloneToWork.containsKey(work)) {
@@ -142,7 +143,7 @@ public class SparkPlanGenerator {
     return result;
   }
 
-  private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
+  private Class<?> getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
     // MergeFileWork is sub-class of MapWork, we don't need to distinguish here
     if (mWork.getInputformat() != null) {
       HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT,
@@ -168,6 +169,7 @@ public class SparkPlanGenerator {
     return inputFormatClass;
   }
 
+  @SuppressWarnings("unchecked")
   private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork)
       throws Exception {
     JobConf jobConf = cloneJobConf(mapWork);
@@ -209,11 +211,12 @@ public class SparkPlanGenerator {
       reduceTran.setReduceFunction(reduceFunc);
       return reduceTran;
     } else {
-      throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, " +
-          "but found " + work.getClass().getName());
+      throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, "
+        + "but found " + work.getClass().getName());
     }
   }
 
+  @SuppressWarnings({ "unchecked" })
   private JobConf cloneJobConf(BaseWork work) throws Exception {
     if (workToJobConf.containsKey(work)) {
       return workToJobConf.get(work);
@@ -225,9 +228,8 @@ public class SparkPlanGenerator {
       cloned.setPartitionerClass((Class<? extends Partitioner>)
           (Class.forName(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER))));
     } catch (ClassNotFoundException e) {
-      String msg = "Could not find partitioner class: " + e.getMessage() +
-          " which is specified by: " +
-        HiveConf.ConfVars.HIVEPARTITIONER.varname;
+      String msg = "Could not find partitioner class: " + e.getMessage()
+        + " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname;
       throw new IllegalArgumentException(msg, e);
     }
     if (work instanceof MapWork) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java Thu Jan  8 02:00:11 2015
@@ -34,9 +34,9 @@ import java.util.Arrays;
 import java.util.Iterator;
 
 public abstract class SparkRecordHandler {
-  protected final String CLASS_NAME = this.getClass().getName();
+  protected static final String CLASS_NAME = SparkRecordHandler.class.getName();
   protected final PerfLogger perfLogger = PerfLogger.getPerfLogger();
-  private final Log LOG = LogFactory.getLog(this.getClass());
+  private static final Log LOG = LogFactory.getLog(SparkRecordHandler.class);
 
   // used to log memory usage periodically
   protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
@@ -48,14 +48,13 @@ public abstract class SparkRecordHandler
   private long rowNumber = 0;
   private long nextLogThreshold = 1;
 
-  public void init(JobConf job, OutputCollector output, Reporter reporter) {
+  public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
     jc = job;
     MapredContext.init(false, new JobConf(jc));
     MapredContext.get().setReporter(reporter);
 
     oc = output;
     rp = reporter;
-//    MapredContext.get().setReporter(reporter);
 
     LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
 
@@ -78,7 +77,7 @@ public abstract class SparkRecordHandler
   /**
    * Process row with key and value collection.
    */
-  public abstract void processRow(Object key, Iterator values) throws IOException;
+  public abstract <E> void processRow(Object key, Iterator<E> values) throws IOException;
 
   /**
    * Log processed row number and used memory info.
@@ -86,9 +85,9 @@ public abstract class SparkRecordHandler
   protected void logMemoryInfo() {
     rowNumber++;
     if (rowNumber == nextLogThreshold) {
-      long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+      long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
       LOG.info("processing " + rowNumber
-        + " rows: used memory = " + used_memory);
+        + " rows: used memory = " + usedMemory);
       nextLogThreshold = getNextLogThreshold(rowNumber);
     }
   }
@@ -97,12 +96,12 @@ public abstract class SparkRecordHandler
   public abstract boolean getDone();
 
   /**
-   * Log information to be logged at the end
+   * Log information to be logged at the end.
    */
   protected void logCloseInfo() {
-    long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+    long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
     LOG.info("processed " + rowNumber + " rows: used memory = "
-      + used_memory);
+      + usedMemory);
   }
 
   private long getNextLogThreshold(long currentThreshold) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Thu Jan  8 02:00:11 2015
@@ -69,7 +69,7 @@ import org.apache.hadoop.util.StringUtil
  * - Catch and handle errors during execution of the operators.
  *
  */
-public class SparkReduceRecordHandler extends SparkRecordHandler{
+public class SparkReduceRecordHandler extends SparkRecordHandler {
 
   private static final Log LOG = LogFactory.getLog(SparkReduceRecordHandler.class);
   private static final String PLAN_KEY = "__REDUCE_PLAN__";
@@ -98,14 +98,15 @@ public class SparkReduceRecordHandler ex
   private VectorizedRowBatch[] batches;
   // number of columns pertaining to keys in a vectorized row batch
   private int keysColumnOffset;
-  private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
+  private static final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
   private StructObjectInspector keyStructInspector;
   private StructObjectInspector[] valueStructInspectors;
   /* this is only used in the error code path */
   private List<VectorExpressionWriter>[] valueStringWriters;
   private MapredLocalWork localWork = null;
 
-  public void init(JobConf job, OutputCollector output, Reporter reporter) {
+  @SuppressWarnings("unchecked")
+  public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
     super.init(job, output, reporter);
 
@@ -240,7 +241,7 @@ public class SparkReduceRecordHandler ex
   }
 
   @Override
-  public void processRow(Object key, Iterator values) throws IOException {
+  public <E> void processRow(Object key, Iterator<E> values) throws IOException {
     if (reducer.getDone()) {
       return;
     }
@@ -305,7 +306,7 @@ public class SparkReduceRecordHandler ex
    * @param values
    * @return true if it is not done and can take more inputs
    */
-  private boolean processKeyValues(Iterator values, byte tag) throws HiveException {
+  private <E> boolean processKeyValues(Iterator<E> values, byte tag) throws HiveException {
     while (values.hasNext()) {
       BytesWritable valueWritable = (BytesWritable) values.next();
       try {
@@ -332,10 +333,10 @@ public class SparkReduceRecordHandler ex
         try {
           rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
         } catch (Exception e2) {
-          rowString = "[Error getting row data with exception " +
-            StringUtils.stringifyException(e2) + " ]";
+          rowString = "[Error getting row data with exception "
+            + StringUtils.stringifyException(e2) + " ]";
         }
-        throw new HiveException("Hive Runtime Error while processing row (tag="
+        throw new HiveException("Error while processing row (tag="
           + tag + ") " + rowString, e);
       }
     }
@@ -346,7 +347,7 @@ public class SparkReduceRecordHandler ex
    * @param values
    * @return true if it is not done and can take more inputs
    */
-  private boolean processVectors(Iterator values, byte tag) throws HiveException {
+  private <E> boolean processVectors(Iterator<E> values, byte tag) throws HiveException {
     VectorizedRowBatch batch = batches[tag];
     batch.reset();
 
@@ -392,7 +393,7 @@ public class SparkReduceRecordHandler ex
         rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2)
           + " ]";
       }
-      throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + tag + ") "
+      throw new HiveException("Error while processing vector batch (tag=" + tag + ") "
         + rowString, e);
     }
     return true; // give me more
@@ -402,7 +403,7 @@ public class SparkReduceRecordHandler ex
     try {
       return inputValueDeserializer[tag].deserialize(valueWritable);
     } catch (SerDeException e) {
-      throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input value (tag="
+      throw new HiveException("Error: Unable to deserialize reduce input value (tag="
         + tag + ") from "
         + Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
         + " with properties " + valueTableDesc[tag].getProperties(), e);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java Thu Jan  8 02:00:11 2015
@@ -67,7 +67,7 @@ public class SparkReporter implements Re
   }
 
   @Override
-  public InputSplit getInputSplit() throws UnsupportedOperationException {
+  public InputSplit getInputSplit() {
     throw new UnsupportedOperationException("do not support this method now.");
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Jan  8 02:00:11 2015
@@ -28,7 +28,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -47,14 +46,13 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
 import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -70,23 +68,20 @@ import org.apache.hadoop.hive.ql.plan.St
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.spark.counter.SparkCounters;
 
 import com.google.common.collect.Lists;
 
 public class SparkTask extends Task<SparkWork> {
-  private final String CLASS_NAME = SparkTask.class.getName();
+  private static final String CLASS_NAME = SparkTask.class.getName();
   private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
   private static final long serialVersionUID = 1L;
-  private transient JobConf job;
-  private transient ContentSummary inputSummary;
   private SparkCounters sparkCounters;
 
   @Override
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     super.initialize(conf, queryPlan, driverContext);
-    job = new JobConf(conf, SparkTask.class);
   }
 
   @Override
@@ -133,7 +128,7 @@ public class SparkTask extends Task<Spar
         rc = close(rc);
         try {
           sparkSessionManager.returnSession(sparkSession);
-        } catch(HiveException ex) {
+        } catch (HiveException ex) {
           LOG.error("Failed to return the session to SessionManager", ex);
         }
       }
@@ -155,7 +150,7 @@ public class SparkTask extends Task<Spar
   }
 
   /**
-   * close will move the temp files into the right place for the fetch
+   * Close will move the temp files into the right place for the fetch
    * task. If the job has failed it will clean up the files.
    */
   private int close(int rc) {
@@ -211,7 +206,7 @@ public class SparkTask extends Task<Spar
           }
         }
         if (candidate) {
-          result.add((MapWork)w);
+          result.add((MapWork) w);
         }
       }
     }
@@ -316,11 +311,11 @@ public class SparkTask extends Task<Spar
     }
     for (Task<? extends Serializable> task : childTasks) {
       if (task instanceof StatsTask) {
-        return (StatsTask)task;
+        return (StatsTask) task;
       } else {
         Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
         if (childTask instanceof StatsTask) {
-          return (StatsTask)childTask;
+          return (StatsTask) childTask;
         } else {
           continue;
         }
@@ -383,7 +378,7 @@ public class SparkTask extends Task<Spar
     }
     SparkWork sparkWork = this.getWork();
     for (BaseWork work : sparkWork.getAllWork()) {
-      for (Operator operator : work.getAllOperators()) {
+      for (Operator<? extends OperatorDesc> operator : work.getAllOperators()) {
         if (operator instanceof FileSinkOperator) {
           for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) {
             hiveCounters.add(counter.toString());
@@ -392,11 +387,11 @@ public class SparkTask extends Task<Spar
           for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) {
             hiveCounters.add(counter.toString());
           }
-        }else if (operator instanceof ScriptOperator) {
+        } else if (operator instanceof ScriptOperator) {
           for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {
             hiveCounters.add(counter.toString());
           }
-        }else if (operator instanceof JoinOperator) {
+        } else if (operator instanceof JoinOperator) {
           for (JoinOperator.SkewkeyTableCounter counter : JoinOperator.SkewkeyTableCounter.values()) {
             hiveCounters.add(counter.toString());
           }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Thu Jan  8 02:00:11 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.spark.api.java.JavaPairRDD;
 
+@SuppressWarnings("rawtypes")
 public interface SparkTran<KI extends WritableComparable, VI, KO extends WritableComparable, VO> {
   JavaPairRDD<KO, VO> transform(
       JavaPairRDD<KI, VI> input);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Thu Jan  8 02:00:11 2015
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.io.BytesWritable;
 
 /**
- * Contains utilities methods used as part of Spark tasks
+ * Contains utilities methods used as part of Spark tasks.
  */
 public class SparkUtilities {
 
@@ -76,7 +76,7 @@ public class SparkUtilities {
     SparkSession sparkSession = SessionState.get().getSparkSession();
 
     // Spark configurations are updated close the existing session
-    if(conf.getSparkConfigUpdated()){
+    if (conf.getSparkConfigUpdated()) {
       sparkSessionManager.closeSession(sparkSession);
       sparkSession =  null;
       conf.setSparkConfigUpdated(false);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java Thu Jan  8 02:00:11 2015
@@ -28,15 +28,17 @@ import scala.Tuple2;
 public interface SparkSession {
   /**
    * Initializes a Spark session for DAG execution.
+   * @param conf Hive configuration.
    */
-  public void open(HiveConf conf) throws HiveException;
+  void open(HiveConf conf) throws HiveException;
 
   /**
-   * Submit given <i>sparkWork</i> to SparkClient
+   * Submit given <i>sparkWork</i> to SparkClient.
    * @param driverContext
    * @param sparkWork
+   * @return SparkJobRef
    */
-  public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+  SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
 
   /**
    * Get Spark shuffle memory per task, and total number of cores. This
@@ -45,25 +47,25 @@ public interface SparkSession {
    * @return a tuple, the first element is the shuffle memory per task in bytes,
    *  the second element is the number of total cores usable by the client
    */
-  public Tuple2<Long, Integer> getMemoryAndCores() throws Exception;
+  Tuple2<Long, Integer> getMemoryAndCores() throws Exception;
 
   /**
-   * Is the session open and ready to submit jobs?
+   * @return true if the session is open and ready to submit jobs.
    */
-  public boolean isOpen();
+  boolean isOpen();
 
   /**
-   * Return configuration.
+   * @return configuration.
    */
-  public HiveConf getConf();
+  HiveConf getConf();
 
   /**
-   * Return session id.
+   * @return session id.
    */
-  public String getSessionId();
+  String getSessionId();
 
   /**
-   * Close session and release resources
+   * Close session and release resources.
    */
-  public void close();
+  void close();
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java Thu Jan  8 02:00:11 2015
@@ -30,7 +30,7 @@ public interface SparkSessionManager {
    *
    * @param hiveConf
    */
-  public void setup(HiveConf hiveConf) throws HiveException;
+  void setup(HiveConf hiveConf) throws HiveException;
 
   /**
    * Get a valid SparkSession. First try to check if existing session is reusable
@@ -40,9 +40,9 @@ public interface SparkSessionManager {
    * @param existingSession Existing session (can be null)
    * @param conf
    * @param doOpen Should the session be opened before returning?
-   * @return
+   * @return SparkSession
    */
-  public SparkSession getSession(SparkSession existingSession, HiveConf conf,
+  SparkSession getSession(SparkSession existingSession, HiveConf conf,
       boolean doOpen) throws HiveException;
 
   /**
@@ -50,16 +50,16 @@ public interface SparkSessionManager {
    * still holds references to session and may want to reuse it in future.
    * When client wants to reuse the session, it should pass the it <i>getSession</i> method.
    */
-  public void returnSession(SparkSession sparkSession) throws HiveException;
+  void returnSession(SparkSession sparkSession) throws HiveException;
 
   /**
    * Close the given session and return it to pool. This is used when the client
    * no longer needs a SparkSession.
    */
-  public void closeSession(SparkSession sparkSession) throws HiveException;
+  void closeSession(SparkSession sparkSession) throws HiveException;
 
   /**
    * Shutdown the session manager. Also closing up SparkSessions in pool.
    */
-  public void shutdown();
+  void shutdown();
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java Thu Jan  8 02:00:11 2015
@@ -64,7 +64,7 @@ public class SparkSessionManagerImpl imp
     });
   }
 
-  public synchronized static SparkSessionManagerImpl getInstance()
+  public static synchronized SparkSessionManagerImpl getInstance()
       throws HiveException {
     if (instance == null) {
       instance = new SparkSessionManagerImpl();
@@ -142,12 +142,12 @@ public class SparkSessionManagerImpl imp
       UserGroupInformation newUgi = Utils.getUGI();
       String newUserName = newUgi.getShortUserName();
 
-      // TODOD this we need to store the session username somewhere else as getUGIForConf never used the conf
+      // TODO this we need to store the session username somewhere else as getUGIForConf never used the conf
       UserGroupInformation ugiInSession = Utils.getUGI();
       String userNameInSession = ugiInSession.getShortUserName();
 
       return newUserName.equals(userNameInSession);
-    } catch(Exception ex) {
+    } catch (Exception ex) {
       throw new HiveException("Failed to get user info from HiveConf.", ex);
     }
   }
@@ -175,7 +175,7 @@ public class SparkSessionManagerImpl imp
   public void shutdown() {
     LOG.info("Closing the session manager.");
     if (createdSessions != null) {
-      synchronized(createdSessions) {
+      synchronized (createdSessions) {
         Iterator<SparkSession> it = createdSessions.iterator();
         while (it.hasNext()) {
           SparkSession session = it.next();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Thu Jan  8 02:00:11 2015
@@ -74,8 +74,8 @@ public class SparkJobMonitor {
         if (LOG.isDebugEnabled()) {
           console.printInfo("state = " + state);
         }
-        if (state != null && state != JobExecutionStatus.UNKNOWN &&
-            (state != lastState || state == JobExecutionStatus.RUNNING)) {
+        if (state != null && state != JobExecutionStatus.UNKNOWN
+          && (state != lastState || state == JobExecutionStatus.RUNNING)) {
           lastState = state;
           Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
 
@@ -84,19 +84,19 @@ public class SparkJobMonitor {
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
               // print job stages.
-              console.printInfo("\nQuery Hive on Spark job[" +
-                sparkJobStatus.getJobId() + "] stages:");
+              console.printInfo("\nQuery Hive on Spark job["
+                + sparkJobStatus.getJobId() + "] stages:");
               for (int stageId : sparkJobStatus.getStageIds()) {
                 console.printInfo(Integer.toString(stageId));
               }
 
-              console.printInfo("\nStatus: Running (Hive on Spark job[" +
-                sparkJobStatus.getJobId() + "])");
+              console.printInfo("\nStatus: Running (Hive on Spark job["
+                + sparkJobStatus.getJobId() + "])");
               startTime = System.currentTimeMillis();
               running = true;
 
-              console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " +
-                "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
+              console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+                + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
             }
 
 
@@ -110,8 +110,8 @@ public class SparkJobMonitor {
               console.printInfo("Status: Finished successfully within a check interval.");
             } else {
               double duration = (System.currentTimeMillis() - startTime) / 1000.0;
-              console.printInfo("Status: Finished successfully in " +
-                  String.format("%.2f seconds", duration));
+              console.printInfo("Status: Finished successfully in "
+                + String.format("%.2f seconds", duration));
             }
             running = false;
             done = true;
@@ -122,6 +122,12 @@ public class SparkJobMonitor {
             done = true;
             rc = 2;
             break;
+          case UNKNOWN:
+            console.printError("Status: Unknown");
+            running = false;
+            done = true;
+            rc = 2;
+            break;
           }
         }
         if (!done) {
@@ -231,11 +237,7 @@ public class SparkJobMonitor {
     }
 
     if (progressMap.isEmpty()) {
-      if (lastProgressMap.isEmpty()) {
-        return true;
-      } else {
-        return false;
-      }
+      return lastProgressMap.isEmpty();
     } else {
       if (lastProgressMap.isEmpty()) {
         return false;
@@ -244,8 +246,8 @@ public class SparkJobMonitor {
           return false;
         }
         for (String key : progressMap.keySet()) {
-          if (!lastProgressMap.containsKey(key) ||
-            !progressMap.get(key).equals(lastProgressMap.get(key))) {
+          if (!lastProgressMap.containsKey(key)
+            || !progressMap.get(key).equals(lastProgressMap.get(key))) {
             return false;
           }
         }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Thu Jan  8 02:00:11 2015
@@ -23,7 +23,7 @@ public class SparkJobRef {
 
   private SparkJobStatus sparkJobStatus;
 
-  public SparkJobRef() {}
+  public SparkJobRef() { }
 
   public SparkJobRef(String jobId) {
     this.jobId = jobId;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Thu Jan  8 02:00:11 2015
@@ -28,17 +28,17 @@ import java.util.Map;
  */
 public interface SparkJobStatus {
 
-  public int getJobId();
+  int getJobId();
 
-  public JobExecutionStatus getState();
+  JobExecutionStatus getState();
 
-  public int[] getStageIds();
+  int[] getStageIds();
 
-  public Map<String, SparkStageProgress> getSparkStageProgress();
+  Map<String, SparkStageProgress> getSparkStageProgress();
 
-  public SparkCounters getCounter();
+  SparkCounters getCounter();
 
-  public SparkStatistics getSparkStatistics();
+  SparkStatistics getSparkStatistics();
 
-  public void cleanup();
+  void cleanup();
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java Thu Jan  8 02:00:11 2015
@@ -23,10 +23,6 @@ public class SparkStageProgress {
   private int succeededTaskCount;
   private int runningTaskCount;
   private int failedTaskCount;
-  // TODO: remove the following two metrics as they're not available in current spark API,
-  // we can add them back once spark provides it
-//  private int killedTaskCount;
-//  private long cumulativeTime;
 
   public SparkStageProgress(
     int totalTaskCount,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java Thu Jan  8 02:00:11 2015
@@ -21,13 +21,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
 import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.JobSucceeded;
 import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerApplicationEnd;
 import org.apache.spark.scheduler.SparkListenerApplicationStart;
@@ -44,9 +40,12 @@ import org.apache.spark.scheduler.SparkL
 import org.apache.spark.scheduler.SparkListenerTaskStart;
 import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 public class JobMetricsListener implements SparkListener {
 
-  private final static Log LOG = LogFactory.getLog(JobMetricsListener.class);
+  private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
 
   private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
   private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
@@ -100,7 +99,7 @@ public class JobMetricsListener implemen
     int jobId = jobStart.jobId();
     int size = jobStart.stageIds().size();
     int[] intStageIds = new int[size];
-    for(int i=0; i< size; i++) {
+    for (int i = 0; i < size; i++) {
       Integer stageId = (Integer) jobStart.stageIds().apply(i);
       intStageIds[i] = stageId;
       stageIdToJobId.put(stageId, jobId);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Thu Jan  8 02:00:11 2015
@@ -99,8 +99,8 @@ public class LocalSparkJobStatus impleme
         int totalTaskCount = sparkStageInfo.numTasks();
         SparkStageProgress sparkStageProgress = new SparkStageProgress(
             totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
-        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
-            sparkStageInfo.currentAttemptId(), sparkStageProgress);
+        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_"
+          + sparkStageInfo.currentAttemptId(), sparkStageProgress);
       }
     }
     return stageProgresses;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Jan  8 02:00:11 2015
@@ -94,8 +94,8 @@ public class RemoteSparkJobStatus implem
         int totalTaskCount = sparkStageInfo.numTasks();
         SparkStageProgress sparkStageProgress = new SparkStageProgress(
             totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
-        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
-            sparkStageInfo.currentAttemptId(), sparkStageProgress);
+        stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_"
+          + sparkStageInfo.currentAttemptId(), sparkStageProgress);
       }
     }
     return stageProgresses;
@@ -132,8 +132,8 @@ public class RemoteSparkJobStatus implem
   }
 
   private SparkJobInfo getSparkJobInfo() {
-    Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
-        jobHandle.getSparkJobIds().get(0) : null;
+    Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1
+      ? jobHandle.getSparkJobIds().get(0) : null;
     if (sparkJobId == null) {
       long duration = TimeUnit.MILLISECONDS.convert(
           System.nanoTime() - startTime, TimeUnit.NANOSECONDS);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java Thu Jan  8 02:00:11 2015
@@ -18,17 +18,16 @@
 
 package org.apache.hadoop.hive.ql.lib;
 
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-
 import java.util.Stack;
-import java.util.regex.Matcher;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
  * Rule that matches a particular type of node.
  */
 public class TypeRule implements Rule {
 
-  private Class nodeClass;
+  private Class<?> nodeClass;
 
   public TypeRule(Class<?> nodeClass) {
     this.nodeClass = nodeClass;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Thu Jan  8 02:00:11 2015
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -79,6 +80,7 @@ public class GenSparkSkewJoinProcessor {
     // prevent instantiation
   }
 
+  @SuppressWarnings("unchecked")
   public static void processSkewJoin(JoinOperator joinOp, Task<? extends Serializable> currTask,
       ReduceWork reduceWork, ParseContext parseCtx) throws SemanticException {
 
@@ -138,7 +140,7 @@ public class GenSparkSkewJoinProcessor {
     // used for create mapJoinDesc, should be in order
     List<TableDesc> newJoinValueTblDesc = new ArrayList<TableDesc>();
 
-    for (Byte tag : tags) {
+    for (int i = 0; i < tags.length; i++) {
       newJoinValueTblDesc.add(null);
     }
 
@@ -231,14 +233,14 @@ public class GenSparkSkewJoinProcessor {
       for (int k = 0; k < tags.length; k++) {
         Operator<? extends OperatorDesc> ts =
             GenMapRedUtils.createTemporaryTableScanOperator(rowSchemaList.get((byte) k));
-        ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k));
+        ((TableScanOperator) ts).setTableDesc(tableDescList.get((byte) k));
         parentOps[k] = ts;
       }
 
       // create the MapJoinOperator
-      String dumpFilePrefix = "mapfile"+ PlanUtils.getCountForMapJoinDumpFilePrefix();
+      String dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
       MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeys, keyTblDesc,
-          newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
+          newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc, joinDescriptor
           .getOutputColumnNames(), i, joinDescriptor.getConds(),
           joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix);
       mapJoinDescriptor.setTagOrder(tags);
@@ -307,7 +309,7 @@ public class GenSparkSkewJoinProcessor {
       for (BaseWork work : sparkWork.getRoots()) {
         Preconditions.checkArgument(work instanceof MapWork,
             "All root work should be MapWork, but got " + work.getClass().getSimpleName());
-        if(work != bigMapWork) {
+        if (work != bigMapWork) {
           sparkWork.connect(work, bigMapWork,
               new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE));
         }
@@ -351,8 +353,9 @@ public class GenSparkSkewJoinProcessor {
   }
 
   /**
-   * Insert SparkHashTableSink and HashTableDummy between small dir TS and MJ
+   * Insert SparkHashTableSink and HashTableDummy between small dir TS and MJ.
    */
+  @SuppressWarnings("unchecked")
   private static void insertSHTS(byte tag, TableScanOperator tableScan, MapWork bigMapWork) {
     Preconditions.checkArgument(tableScan.getChildOperators().size() == 1
         && tableScan.getChildOperators().get(0) instanceof MapJoinOperator);
@@ -419,7 +422,7 @@ public class GenSparkSkewJoinProcessor {
   public static boolean supportRuntimeSkewJoin(JoinOperator joinOp,
       Task<? extends Serializable> currTask, HiveConf hiveConf) {
     List<Task<? extends Serializable>> children = currTask.getChildTasks();
-    return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp) &&
-        (children == null || children.size() <= 1);
+    return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp)
+      && (children == null || children.size() <= 1);
   }
 }