You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/26 19:38:34 UTC

svn commit: r1654861 [4/4] - in /hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ hbase-handler/ itests/src/test/resources/ itests/util/src/main/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apac...

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out Mon Jan 26 18:38:31 2015
@@ -74,8 +74,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out Mon Jan 26 18:38:31 2015
@@ -77,8 +77,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out Mon Jan 26 18:38:31 2015
@@ -72,8 +72,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out Mon Jan 26 18:38:31 2015
@@ -72,8 +72,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -199,7 +199,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	40                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out Mon Jan 26 18:38:31 2015
@@ -74,7 +74,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -197,7 +197,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	3                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	68                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out Mon Jan 26 18:38:31 2015
@@ -70,8 +70,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -203,7 +203,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	40                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out Mon Jan 26 18:38:31 2015
@@ -70,8 +70,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -207,7 +207,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	20                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out Mon Jan 26 18:38:31 2015
@@ -66,8 +66,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -199,7 +199,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	60                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out Mon Jan 26 18:38:31 2015
@@ -84,8 +84,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -218,7 +218,7 @@ Protect Mode:       	None
 #### A masked pattern was here ####
 Partition Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	40                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out Mon Jan 26 18:38:31 2015
@@ -73,8 +73,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -245,7 +245,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	40                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out Mon Jan 26 18:38:31 2015
@@ -81,7 +81,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -249,7 +249,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	3                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
 	totalSize           	68                  

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out Mon Jan 26 18:38:31 2015
@@ -71,8 +71,8 @@ STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out Mon Jan 26 18:38:31 2015
@@ -72,8 +72,8 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 4 <- Map 3 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
+        Reducer 4 <- Map 3 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -199,10 +199,10 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	2                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
-	totalSize           	178                 
+	totalSize           	336                 
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out Mon Jan 26 18:38:31 2015
@@ -78,7 +78,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -201,10 +201,10 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	3                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
-	totalSize           	271                 
+	totalSize           	350                 
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out Mon Jan 26 18:38:31 2015
@@ -85,7 +85,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -255,10 +255,10 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	false               
-	numFiles            	3                   
+	numFiles            	4                   
 	numRows             	-1                  
 	rawDataSize         	-1                  
-	totalSize           	271                 
+	totalSize           	350                 
 #### A masked pattern was here ####
 	 	 
 # Storage Information	 	 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out Mon Jan 26 18:38:31 2015
@@ -73,7 +73,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out Mon Jan 26 18:38:31 2015
@@ -72,7 +72,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
         Reducer 3 <- Reducer 2 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out Mon Jan 26 18:38:31 2015
@@ -72,7 +72,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
         Reducer 3 <- Reducer 2 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out Mon Jan 26 18:38:31 2015
@@ -68,7 +68,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
         Reducer 3 <- Reducer 2 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out Mon Jan 26 18:38:31 2015
@@ -50,7 +50,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out Mon Jan 26 18:38:31 2015
@@ -46,7 +46,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP, 1)
+        Reducer 2 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 

Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out Mon Jan 26 18:38:31 2015
@@ -14,7 +14,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1)
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
         Reducer 3 <- Reducer 2 (GROUP, 1)
 #### A masked pattern was here ####
       Vertices:

Modified: hive/trunk/spark-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/pom.xml?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/pom.xml (original)
+++ hive/trunk/spark-client/pom.xml Mon Jan 26 18:38:31 2015
@@ -65,6 +65,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Mon Jan 26 18:38:31 2015
@@ -121,6 +121,20 @@ abstract class BaseProtocol extends RpcD
 
   }
 
+  protected static class JobStarted implements Serializable {
+
+    final String id;
+
+    JobStarted(String id) {
+      this.id = id;
+    }
+
+    JobStarted() {
+      this(null);
+    }
+
+  }
+
   /**
    * Inform the client that a new spark job has been submitted for the client job.
    */
@@ -138,4 +152,18 @@ abstract class BaseProtocol extends RpcD
     }
   }
 
+  protected static class SyncJobRequest<T extends Serializable> implements Serializable {
+
+    final Job<T> job;
+
+    SyncJobRequest(Job<T> job) {
+      this.job = job;
+    }
+
+    SyncJobRequest() {
+      this(null);
+    }
+
+  }
+
 }

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Mon Jan 26 18:38:31 2015
@@ -55,4 +55,53 @@ public interface JobHandle<T extends Ser
    */
   SparkCounters getSparkCounters();
 
+  /**
+   * Return the current state of the job.
+   */
+  State getState();
+
+  /**
+   * Add a listener to the job handle. If the job's state is not SENT, a callback for the
+   * corresponding state will be invoked immediately.
+   *
+   * @param l The listener to add.
+   */
+  void addListener(Listener<T> l);
+
+  /**
+   * The current state of the submitted job.
+   */
+  static enum State {
+    SENT,
+    QUEUED,
+    STARTED,
+    CANCELLED,
+    FAILED,
+    SUCCEEDED;
+  }
+
+  /**
+   * A listener for monitoring the state of the job in the remote context. Callbacks are called
+   * when the corresponding state change occurs.
+   */
+  static interface Listener<T extends Serializable> {
+
+    void onJobQueued(JobHandle<T> job);
+
+    void onJobStarted(JobHandle<T> job);
+
+    void onJobCancelled(JobHandle<T> job);
+
+    void onJobFailed(JobHandle<T> job, Throwable cause);
+
+    void onJobSucceeded(JobHandle<T> job, T result);
+
+    /**
+     * Called when a monitored Spark job is started on the remote context. This callback
+     * does not indicate a state change in the client job's status.
+     */
+    void onSparkJobStarted(JobHandle<T> job, int sparkJobId);
+
+  }
+
 }

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Mon Jan 26 18:38:31 2015
@@ -17,15 +17,16 @@
 
 package org.apache.hive.spark.client;
 
-import io.netty.util.concurrent.Promise;
-
 import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Promise;
 
 import org.apache.hive.spark.counter.SparkCounters;
 
@@ -34,28 +35,30 @@ import org.apache.hive.spark.counter.Spa
  */
 class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
 
-  private final AtomicBoolean cancelled;
   private final SparkClientImpl client;
   private final String jobId;
   private final MetricsCollection metrics;
   private final Promise<T> promise;
   private final List<Integer> sparkJobIds;
+  private final List<Listener> listeners;
+  private volatile State state;
   private volatile SparkCounters sparkCounters;
 
   JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) {
-    this.cancelled = new AtomicBoolean();
     this.client = client;
     this.jobId = jobId;
     this.promise = promise;
+    this.listeners = Lists.newLinkedList();
     this.metrics = new MetricsCollection();
     this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
+    this.state = State.SENT;
     this.sparkCounters = null;
   }
 
   /** Requests a running job to be cancelled. */
   @Override
   public boolean cancel(boolean mayInterrupt) {
-    if (cancelled.compareAndSet(false, true)) {
+    if (changeState(State.CANCELLED)) {
       client.cancel(jobId);
       promise.cancel(mayInterrupt);
       return true;
@@ -114,20 +117,116 @@ class JobHandleImpl<T extends Serializab
     return sparkCounters;
   }
 
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  @Override
+  public void addListener(Listener l) {
+    synchronized (listeners) {
+      listeners.add(l);
+      // If current state is a final state, notify of Spark job IDs before notifying about the
+      // state transition.
+      if (state.ordinal() >= State.CANCELLED.ordinal()) {
+        for (Integer i : sparkJobIds) {
+          l.onSparkJobStarted(this, i);
+        }
+      }
+
+      fireStateChange(state, l);
+
+      // Otherwise, notify about Spark jobs after the state notification.
+      if (state.ordinal() < State.CANCELLED.ordinal()) {
+        for (Integer i : sparkJobIds) {
+          l.onSparkJobStarted(this, i);
+        }
+      }
+    }
+  }
+
   public void setSparkCounters(SparkCounters sparkCounters) {
     this.sparkCounters = sparkCounters;
   }
 
   @SuppressWarnings("unchecked")
   void setSuccess(Object result) {
-    promise.setSuccess((T) result);
+    // The synchronization here is not necessary, but tests depend on it.
+    synchronized (listeners) {
+      promise.setSuccess((T) result);
+      changeState(State.SUCCEEDED);
+    }
   }
 
   void setFailure(Throwable error) {
-    promise.setFailure(error);
+    // The synchronization here is not necessary, but tests depend on it.
+    synchronized (listeners) {
+      promise.setFailure(error);
+      changeState(State.FAILED);
+    }
+  }
+
+  /**
+   * Changes the state of this job handle, making sure that illegal state transitions are ignored.
+   * Fires events appropriately.
+   *
+   * As a rule, state transitions can only occur if the current state is "higher" than the current
+   * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
+   * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
+   * than the CANCELLED enum constant.
+   */
+  boolean changeState(State newState) {
+    synchronized (listeners) {
+      if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
+        state = newState;
+        for (Listener l : listeners) {
+          fireStateChange(newState, l);
+        }
+        return true;
+      }
+      return false;
+    }
+  }
+
+  void addSparkJobId(int sparkJobId) {
+    synchronized (listeners) {
+      sparkJobIds.add(sparkJobId);
+      for (Listener l : listeners) {
+        l.onSparkJobStarted(this, sparkJobId);
+      }
+    }
+  }
+
+  private void fireStateChange(State s, Listener l) {
+    switch (s) {
+    case SENT:
+      break;
+    case QUEUED:
+      l.onJobQueued(this);
+      break;
+    case STARTED:
+      l.onJobStarted(this);
+      break;
+    case CANCELLED:
+      l.onJobCancelled(this);
+      break;
+    case FAILED:
+      l.onJobFailed(this, promise.cause());
+      break;
+    case SUCCEEDED:
+      try {
+        l.onJobSucceeded(this, promise.get());
+      } catch (Exception e) {
+        // Shouldn't really happen.
+        throw new IllegalStateException(e);
+      }
+      break;
+    default:
+      throw new IllegalStateException();
+    }
   }
 
-  /** Last attempt resort at preventing stray jobs from accumulating in SparkClientImpl. */
+  /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
   @Override
   protected void finalize() {
     if (!isDone()) {

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Mon Jan 26 18:38:31 2015
@@ -77,9 +77,9 @@ public class RemoteDriver {
   private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
 
   private final Map<String, JobWrapper<?>> activeJobs;
+  private final Object jcLock;
   private final Object shutdownLock;
   private final ExecutorService executor;
-  private final JobContextImpl jc;
   private final NioEventLoopGroup egroup;
   private final Rpc clientRpc;
   private final DriverProtocol protocol;
@@ -87,10 +87,14 @@ public class RemoteDriver {
   // Used to queue up requests while the SparkContext is being created.
   private final List<JobWrapper<?>> jobQueue = Lists.newLinkedList();
 
-  private boolean running;
+  // jc is effectively final, but it has to be volatile since it's accessed by different
+  // threads while the constructor is running.
+  private volatile JobContextImpl jc;
+  private volatile boolean running;
 
   private RemoteDriver(String[] args) throws Exception {
     this.activeJobs = Maps.newConcurrentMap();
+    this.jcLock = new Object();
     this.shutdownLock = new Object();
 
     SparkConf conf = new SparkConf();
@@ -120,6 +124,7 @@ public class RemoteDriver {
     Map<String, String> mapConf = Maps.newHashMap();
     for (Tuple2<String, String> e : conf.getAll()) {
       mapConf.put(e._1(), e._2());
+      LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2());
     }
 
     String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET);
@@ -150,14 +155,20 @@ public class RemoteDriver {
     try {
       JavaSparkContext sc = new JavaSparkContext(conf);
       sc.sc().addSparkListener(new ClientListener());
-      jc = new JobContextImpl(sc);
+      synchronized (jcLock) {
+        jc = new JobContextImpl(sc);
+        jcLock.notifyAll();
+      }
     } catch (Exception e) {
       LOG.error("Failed to start SparkContext.", e);
       shutdown(e);
+      synchronized (jcLock) {
+        jcLock.notifyAll();
+      }
       throw e;
     }
 
-    synchronized (jobQueue) {
+    synchronized (jcLock) {
       for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) {
         it.next().submit();
       }
@@ -174,7 +185,7 @@ public class RemoteDriver {
   }
 
   private void submit(JobWrapper<?> job) {
-    synchronized (jobQueue) {
+    synchronized (jcLock) {
       if (jc != null) {
         job.submit();
       } else {
@@ -235,6 +246,10 @@ public class RemoteDriver {
       clientRpc.call(new JobResult(jobId, result, error, counters));
     }
 
+    void jobStarted(String jobId) {
+      clientRpc.call(new JobStarted(jobId));
+    }
+
     void jobSubmitted(String jobId, int sparkJobId) {
       LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
       clientRpc.call(new JobSubmitted(jobId, sparkJobId));
@@ -264,6 +279,35 @@ public class RemoteDriver {
       submit(wrapper);
     }
 
+    private Object handle(ChannelHandlerContext ctx, SyncJobRequest msg) throws Exception {
+      // In case the job context is not up yet, let's wait, since this is supposed to be a
+      // "synchronous" RPC.
+      if (jc == null) {
+        synchronized (jcLock) {
+          while (jc == null) {
+            jcLock.wait();
+            if (!running) {
+              throw new IllegalStateException("Remote context is shutting down.");
+            }
+          }
+        }
+      }
+
+      jc.setMonitorCb(new MonitorCallback() {
+        @Override
+        public void call(JavaFutureAction<?> future,
+            SparkCounters sparkCounters, Set<Integer> cachedRDDIds) {
+          throw new IllegalStateException(
+            "JobContext.monitor() is not available for synchronous jobs.");
+        }
+      });
+      try {
+        return msg.job.call(jc);
+      } finally {
+        jc.setMonitorCb(null);
+      }
+    }
+
   }
 
   private class JobWrapper<T extends Serializable> implements Callable<Void> {
@@ -286,6 +330,8 @@ public class RemoteDriver {
 
     @Override
     public Void call() throws Exception {
+      protocol.jobStarted(req.id);
+
       try {
         jc.setMonitorCb(new MonitorCallback() {
           @Override

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Mon Jan 26 18:38:31 2015
@@ -38,6 +38,23 @@ public interface SparkClient extends Ser
   <T extends Serializable> JobHandle<T> submit(Job<T> job);
 
   /**
+   * Asks the remote context to run a job immediately.
+   * <p/>
+   * Normally, the remote context will queue jobs and execute them based on how many worker
+   * threads have been configured. This method will run the submitted job in the same thread
+   * processing the RPC message, so that queueing does not apply.
+   * <p/>
+   * It's recommended that this method only be used to run code that finishes quickly. This
+   * avoids interfering with the normal operation of the context.
+   * <p/>
+   * Note: the {@link JobContext#monitor()} functionality is not available when using this method.
+   *
+   * @param job The job to execute.
+   * @return A future to monitor the result of the job.
+   */
+  <T extends Serializable> Future<T> run(Job<T> job);
+
+  /**
    * Stops the remote context.
    *
    * Any pending jobs will be cancelled, and the remote context will be torn down.

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Mon Jan 26 18:38:31 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.client.rpc.RpcServer;
 import org.apache.spark.SparkException;
 
@@ -67,12 +68,13 @@ public final class SparkClientFactory {
   /**
    * Instantiates a new Spark client.
    *
-   * @param conf Configuration for the remote Spark application.
+   * @param sparkConf Configuration for the remote Spark application, contains spark.* properties.
+   * @param hiveConf Configuration for Hive, contains hive.* properties.
    */
-  public static synchronized SparkClient createClient(Map<String, String> conf)
+  public static synchronized SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf)
       throws IOException, SparkException {
     Preconditions.checkState(server != null, "initialize() not called.");
-    return new SparkClientImpl(server, conf);
+    return new SparkClientImpl(server, sparkConf, hiveConf);
   }
 
 }

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Mon Jan 26 18:38:31 2015
@@ -40,7 +40,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.client.rpc.Rpc;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.hive.spark.client.rpc.RpcServer;
 import org.apache.spark.SparkContext;
 import org.apache.spark.SparkException;
@@ -67,6 +69,7 @@ class SparkClientImpl implements SparkCl
   private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
 
   private final Map<String, String> conf;
+  private final HiveConf hiveConf;
   private final AtomicInteger childIdGenerator;
   private final Thread driverThread;
   private final Map<String, JobHandleImpl<?>> jobs;
@@ -74,8 +77,9 @@ class SparkClientImpl implements SparkCl
   private final ClientProtocol protocol;
   private volatile boolean isAlive;
 
-  SparkClientImpl(RpcServer rpcServer, Map<String, String> conf) throws IOException, SparkException {
+  SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf) throws IOException, SparkException {
     this.conf = conf;
+    this.hiveConf = hiveConf;
     this.childIdGenerator = new AtomicInteger();
     this.jobs = Maps.newConcurrentMap();
 
@@ -116,13 +120,16 @@ class SparkClientImpl implements SparkCl
   }
 
   @Override
+  public <T extends Serializable> Future<T> run(Job<T> job) {
+    return protocol.run(job);
+  }
+
+  @Override
   public void stop() {
     if (isAlive) {
       isAlive = false;
       try {
-        protocol.endSession().get(10, TimeUnit.SECONDS);
-      } catch (TimeoutException te) {
-        LOG.warn("Timed out waiting for driver to respond to stop request.");
+        protocol.endSession();
       } catch (Exception e) {
         LOG.warn("Exception while waiting for end session reply.", e);
       } finally {
@@ -137,29 +144,29 @@ class SparkClientImpl implements SparkCl
       LOG.debug("Interrupted before driver thread was finished.");
     }
     if (endTime - System.currentTimeMillis() <= 0) {
-      LOG.debug("Shut down time out.");
+      LOG.warn("Timed out shutting down remote driver, interrupting...");
       driverThread.interrupt();
     }
   }
 
   @Override
   public Future<?> addJar(URL url) {
-    return submit(new AddJarJob(url.toString()));
+    return run(new AddJarJob(url.toString()));
   }
 
   @Override
   public Future<?> addFile(URL url) {
-    return submit(new AddFileJob(url.toString()));
+    return run(new AddFileJob(url.toString()));
   }
 
   @Override
   public Future<Integer> getExecutorCount() {
-    return submit(new GetExecutorCountJob());
+    return run(new GetExecutorCountJob());
   }
 
   @Override
   public Future<Integer> getDefaultParallelism() {
-    return submit(new GetDefaultParallelismJob());
+    return run(new GetDefaultParallelismJob());
   }
 
   void cancel(String jobId) {
@@ -296,6 +303,25 @@ class SparkClientImpl implements SparkCl
         argv.add("org.apache.spark.deploy.SparkSubmit");
       }
 
+      if (master.equals("yarn-cluster")) {
+        String executorCores = conf.get("spark.executor.cores");
+        if (executorCores != null) {
+          argv.add("--executor-cores");
+          argv.add(executorCores);
+        }
+
+        String executorMemory = conf.get("spark.executor.memory");
+        if (executorMemory != null) {
+          argv.add("--executor-memory");
+          argv.add(executorMemory);
+        }
+
+        String numOfExecutors = conf.get("spark.executor.instances");
+        if (numOfExecutors != null) {
+          argv.add("--num-executors");
+          argv.add(numOfExecutors);
+        }
+      }
 
       argv.add("--properties-file");
       argv.add(properties.getAbsolutePath());
@@ -313,6 +339,14 @@ class SparkClientImpl implements SparkCl
       argv.add("--remote-port");
       argv.add(serverPort);
 
+      //hive.spark.* keys are passed down to the RemoteDriver via --conf,
+      //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+      for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+        String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+        argv.add("--conf");
+        argv.add(String.format("%s=%s", hiveSparkConfKey, value));
+      }
+
       LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv));
 
       ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()]));
@@ -360,7 +394,7 @@ class SparkClientImpl implements SparkCl
     <T extends Serializable> JobHandleImpl<T> submit(Job<T> job) {
       final String jobId = UUID.randomUUID().toString();
       final Promise<T> promise = driverRpc.createPromise();
-      JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
+      final JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
       jobs.put(jobId, handle);
 
       final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
@@ -371,7 +405,9 @@ class SparkClientImpl implements SparkCl
       rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
         @Override
         public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
-          if (!f.isSuccess() && !promise.isDone()) {
+          if (f.isSuccess()) {
+            handle.changeState(JobHandle.State.QUEUED);
+          } else if (!promise.isDone()) {
             promise.setFailure(f.cause());
           }
         }
@@ -379,16 +415,24 @@ class SparkClientImpl implements SparkCl
       promise.addListener(new GenericFutureListener<Promise<T>>() {
         @Override
         public void operationComplete(Promise<T> p) {
-          jobs.remove(jobId);
+          if (jobId != null) {
+            jobs.remove(jobId);
+          }
           if (p.isCancelled() && !rpc.isDone()) {
             rpc.cancel(true);
           }
         }
       });
-
       return handle;
     }
 
+    <T extends Serializable> Future<T> run(Job<T> job) {
+      @SuppressWarnings("unchecked")
+      final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
+        driverRpc.call(new SyncJobRequest(job), Serializable.class);
+      return rpc;
+    }
+
     void cancel(String jobId) {
       driverRpc.call(new CancelJob(jobId));
     }
@@ -426,11 +470,20 @@ class SparkClientImpl implements SparkCl
       }
     }
 
+    private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+      JobHandleImpl<?> handle = jobs.get(msg.id);
+      if (handle != null) {
+        handle.changeState(JobHandle.State.STARTED);
+      } else {
+        LOG.warn("Received event for unknown job {}", msg.id);
+      }
+    }
+
     private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
       JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
       if (handle != null) {
         LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
-        handle.getSparkJobIds().add(msg.sparkJobId);
+        handle.addSparkJobId(msg.sparkJobId);
       } else {
         LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
       }

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java Mon Jan 26 18:38:31 2015
@@ -84,7 +84,7 @@ public class Rpc implements Closeable {
       final String secret,
       final RpcDispatcher dispatcher) throws Exception {
     final RpcConfiguration rpcConf = new RpcConfiguration(config);
-    int connectTimeoutMs = rpcConf.getConnectTimeoutMs();
+    int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();
 
     final ChannelFuture cf = new Bootstrap()
         .group(eloop)

Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java Mon Jan 26 18:38:31 2015
@@ -21,9 +21,14 @@ import java.io.IOException;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,63 +42,49 @@ public final class RpcConfiguration {
 
   private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class);
 
-  /** Connection timeout for RPC clients. */
-  public static final String CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.connect.timeout.ms";
-  private static final int CONNECT_TIMEOUT_MS_DEFAULT = 1000;
+  public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of(
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
+    HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
+    HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
+    HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
+    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+  );
+  public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+    HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
+  );
 
-  /**
-   * How long the server should wait for clients to connect back after they're
-   * registered. Also used to time out the client waiting for the server to
-   * reply to its "hello" message.
-   */
-  public static final String SERVER_CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.server.connect.timeout.ms";
-  private static final long SERVER_CONNECT_TIMEOUT_MS_DEFAULT = 10000L;
-
-  /**
-   * Number of bits of randomness in the generated client secrets. Rounded down
-   * to the nearest multiple of 8.
-   */
-  public static final String SECRET_RANDOM_BITS_KEY = "hive.spark.client.secret.bits";
-  private static final int SECRET_RANDOM_BITS_DEFAULT = 256;
-
-  /** Hostname or IP address to advertise for the server. */
   public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
 
-  /** Maximum number of threads to use for the RPC event loop. */
-  public static final String RPC_MAX_THREADS_KEY = "hive.spark.client.rpc.threads";
-  public static final int RPC_MAX_THREADS_DEFAULT = 8;
-
-  /** Maximum message size. Default = 10MB. */
-  public static final String RPC_MAX_MESSAGE_SIZE_KEY = "hive.spark.client.rpc.max.size";
-  public static final int RPC_MAX_MESSAGE_SIZE_DEFAULT = 50 * 1024 * 1024;
-
-  /** Channel logging level. */
-  public static final String RPC_CHANNEL_LOG_LEVEL_KEY = "hive.spark.client.channel.log.level";
-
   private final Map<String, String> config;
 
+  private static final HiveConf DEFAULT_CONF = new HiveConf();
+
   public RpcConfiguration(Map<String, String> config) {
     this.config = config;
   }
 
-  int getConnectTimeoutMs() {
-    String value = config.get(CONNECT_TIMEOUT_MS_KEY);
-    return value != null ? Integer.parseInt(value) : CONNECT_TIMEOUT_MS_DEFAULT;
+  long getConnectTimeoutMs() {
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname);
+    return value != null ? Integer.parseInt(value) : DEFAULT_CONF.getTimeVar(
+      HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   int getMaxMessageSize() {
-    String value = config.get(RPC_MAX_MESSAGE_SIZE_KEY);
-    return value != null ? Integer.parseInt(value) : RPC_MAX_MESSAGE_SIZE_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname);
+    return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.defaultIntVal;
   }
 
   long getServerConnectTimeoutMs() {
-    String value = config.get(SERVER_CONNECT_TIMEOUT_MS_KEY);
-    return value != null ? Long.parseLong(value) : SERVER_CONNECT_TIMEOUT_MS_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname);
+    return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(
+      HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT, TimeUnit.MILLISECONDS);
   }
 
   int getSecretBits() {
-    String value = config.get(SECRET_RANDOM_BITS_KEY);
-    return value != null ? Integer.parseInt(value) : SECRET_RANDOM_BITS_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname);
+    return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
   }
 
   String getServerAddress() throws IOException {
@@ -133,12 +124,28 @@ public final class RpcConfiguration {
   }
 
   String getRpcChannelLogLevel() {
-    return config.get(RPC_CHANNEL_LOG_LEVEL_KEY);
+    return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname);
   }
 
   public int getRpcThreadCount() {
-    String value = config.get(RPC_MAX_THREADS_KEY);
-    return value != null ? Integer.parseInt(value) : RPC_MAX_THREADS_DEFAULT;
+    String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname);
+    return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.defaultIntVal;
   }
 
+
+  /**
+   * Utility method for a given RpcConfiguration key, to convert value to millisecond if it is a time value,
+   * and return as string in either case.
+   * @param conf hive configuration
+   * @param key Rpc configuration to lookup (hive.spark.*)
+   * @return string form of the value
+   */
+  public static String getValue(HiveConf conf, String key) {
+    if (HIVE_SPARK_TIME_CONFIGS.contains(key)) {
+      HiveConf.ConfVars confVar = HiveConf.getConfVars(key);
+      return String.valueOf(conf.getTimeVar(confVar, TimeUnit.MILLISECONDS));
+    } else {
+      return conf.get(key);
+    }
+  }
 }

Modified: hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Mon Jan 26 18:38:31 2015
@@ -17,40 +17,41 @@
 
 package org.apache.hive.spark.client;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.spark.counter.SparkCounters;
+import org.apache.spark.SparkException;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.junit.Test;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestSparkClient {
 
   // Timeouts are bad... mmmkay.
   private static final long TIMEOUT = 20;
+  private static final HiveConf HIVECONF = new HiveConf();
 
   private Map<String, String> createConf(boolean local) {
     Map<String, String> conf = new HashMap<String, String>();
@@ -78,8 +79,19 @@ public class TestSparkClient {
     runTest(true, new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
+        JobHandle.Listener<String> listener = newListener();
         JobHandle<String> handle = client.submit(new SimpleJob());
+        handle.addListener(listener);
         assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+
+        // Try an invalid state transition on the handle. This ensures that the actual state
+        // change we're interested in actually happened, since internally the handle serializes
+        // state changes.
+        assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobQueued(handle);
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
       }
     });
   }
@@ -100,12 +112,36 @@ public class TestSparkClient {
     runTest(true, new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
-      JobHandle<String> handle = client.submit(new SimpleJob());
+        JobHandle.Listener<String> listener = newListener();
+        JobHandle<String> handle = client.submit(new ErrorJob());
+        handle.addListener(listener);
         try {
           handle.get(TIMEOUT, TimeUnit.SECONDS);
+          fail("Should have thrown an exception.");
         } catch (ExecutionException ee) {
-          assertTrue(ee.getCause() instanceof IllegalStateException);
+          assertTrue(ee.getCause() instanceof SparkException);
+          assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello"));
         }
+
+        // Try an invalid state transition on the handle. This ensures that the actual state
+        // change we're interested in actually happened, since internally the handle serializes
+        // state changes.
+        assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+        verify(listener).onJobQueued(handle);
+        verify(listener).onJobStarted(handle);
+        verify(listener).onJobFailed(same(handle), any(Throwable.class));
+      }
+    });
+  }
+
+  @Test
+  public void testSyncRpc() throws Exception {
+    runTest(true, new TestFunction() {
+      @Override
+      public void call(SparkClient client) throws Exception {
+        Future<String> result = client.run(new SyncRpc());
+        assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
       }
     });
   }
@@ -126,18 +162,26 @@ public class TestSparkClient {
     runTest(true, new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
+        JobHandle.Listener<Integer> listener = newListener();
         JobHandle<Integer> future = client.submit(new AsyncSparkJob());
+        future.addListener(listener);
         future.get(TIMEOUT, TimeUnit.SECONDS);
         MetricsCollection metrics = future.getMetrics();
         assertEquals(1, metrics.getJobIds().size());
         assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+        verify(listener).onSparkJobStarted(same(future),
+          eq(metrics.getJobIds().iterator().next()));
 
+        JobHandle.Listener<Integer> listener2 = newListener();
         JobHandle<Integer> future2 = client.submit(new AsyncSparkJob());
+        future2.addListener(listener2);
         future2.get(TIMEOUT, TimeUnit.SECONDS);
         MetricsCollection metrics2 = future2.getMetrics();
         assertEquals(1, metrics2.getJobIds().size());
         assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
         assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+        verify(listener2).onSparkJobStarted(same(future2),
+          eq(metrics2.getJobIds().iterator().next()));
       }
     });
   }
@@ -214,13 +258,20 @@ public class TestSparkClient {
     });
   }
 
+  private <T extends Serializable> JobHandle.Listener<T> newListener() {
+    @SuppressWarnings("unchecked")
+    JobHandle.Listener<T> listener =
+      (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
+    return listener;
+  }
+
   private void runTest(boolean local, TestFunction test) throws Exception {
     Map<String, String> conf = createConf(local);
     SparkClientFactory.initialize(conf);
     SparkClient client = null;
     try {
       test.config(conf);
-      client = SparkClientFactory.createClient(conf);
+      client = SparkClientFactory.createClient(conf, HIVECONF);
       test.call(client);
     } finally {
       if (client != null) {
@@ -239,6 +290,15 @@ public class TestSparkClient {
 
   }
 
+  private static class ErrorJob implements Job<String> {
+
+    @Override
+    public String call(JobContext jc) {
+      throw new IllegalStateException("Hello");
+    }
+
+  }
+
   private static class SparkJob implements Job<Long> {
 
     @Override
@@ -332,6 +392,15 @@ public class TestSparkClient {
     }
 
   }
+
+  private static class SyncRpc implements Job<String> {
+
+    @Override
+    public String call(JobContext jc) {
+      return "Hello";
+    }
+
+  }
 
   private abstract static class TestFunction {
     abstract void call(SparkClient client) throws Exception;

Modified: hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java (original)
+++ hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java Mon Jan 26 18:38:31 2015
@@ -32,6 +32,7 @@ import io.netty.channel.embedded.Embedde
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.Future;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class TestRpc {
 
   private Collection<Closeable> closeables;
   private Map<String, String> emptyConfig =
-      ImmutableMap.of(RpcConfiguration.RPC_CHANNEL_LOG_LEVEL_KEY, "DEBUG");
+      ImmutableMap.of(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, "DEBUG");
 
   @Before
   public void setUp() {