You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/26 19:38:34 UTC
svn commit: r1654861 [4/4] - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ hbase-handler/
itests/src/test/resources/
itests/util/src/main/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apac...
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_15.q.out Mon Jan 26 18:38:31 2015
@@ -74,8 +74,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_16.q.out Mon Jan 26 18:38:31 2015
@@ -77,8 +77,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_18.q.out Mon Jan 26 18:38:31 2015
@@ -72,8 +72,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_19.q.out Mon Jan 26 18:38:31 2015
@@ -72,8 +72,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -199,7 +199,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 40
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_2.q.out Mon Jan 26 18:38:31 2015
@@ -74,7 +74,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -197,7 +197,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 3
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 68
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_20.q.out Mon Jan 26 18:38:31 2015
@@ -70,8 +70,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -203,7 +203,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 40
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_21.q.out Mon Jan 26 18:38:31 2015
@@ -70,8 +70,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -207,7 +207,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 20
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_24.q.out Mon Jan 26 18:38:31 2015
@@ -66,8 +66,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -199,7 +199,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 60
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_25.q.out Mon Jan 26 18:38:31 2015
@@ -84,8 +84,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -218,7 +218,7 @@ Protect Mode: None
#### A masked pattern was here ####
Partition Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 40
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_4.q.out Mon Jan 26 18:38:31 2015
@@ -73,8 +73,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -245,7 +245,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 40
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_5.q.out Mon Jan 26 18:38:31 2015
@@ -81,7 +81,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -249,7 +249,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 3
+ numFiles 4
numRows -1
rawDataSize -1
totalSize 68
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_6.q.out Mon Jan 26 18:38:31 2015
@@ -71,8 +71,8 @@ STAGE PLANS:
Stage: Stage-2
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_7.q.out Mon Jan 26 18:38:31 2015
@@ -72,8 +72,8 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
- Reducer 4 <- Map 3 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 4 <- Map 3 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -199,10 +199,10 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 2
+ numFiles 4
numRows -1
rawDataSize -1
- totalSize 178
+ totalSize 336
#### A masked pattern was here ####
# Storage Information
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_8.q.out Mon Jan 26 18:38:31 2015
@@ -78,7 +78,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -201,10 +201,10 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 3
+ numFiles 4
numRows -1
rawDataSize -1
- totalSize 271
+ totalSize 350
#### A masked pattern was here ####
# Storage Information
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/union_remove_9.q.out Mon Jan 26 18:38:31 2015
@@ -85,7 +85,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -255,10 +255,10 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE false
- numFiles 3
+ numFiles 4
numRows -1
rawDataSize -1
- totalSize 271
+ totalSize 350
#### A masked pattern was here ####
# Storage Information
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vector_mapjoin_reduce.q.out Mon Jan 26 18:38:31 2015
@@ -73,7 +73,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_13.q.out Mon Jan 26 18:38:31 2015
@@ -72,7 +72,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
Reducer 3 <- Reducer 2 (SORT, 1)
#### A masked pattern was here ####
Vertices:
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_14.q.out Mon Jan 26 18:38:31 2015
@@ -72,7 +72,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
Reducer 3 <- Reducer 2 (SORT, 1)
#### A masked pattern was here ####
Vertices:
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_15.q.out Mon Jan 26 18:38:31 2015
@@ -68,7 +68,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
Reducer 3 <- Reducer 2 (SORT, 1)
#### A masked pattern was here ####
Vertices:
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_16.q.out Mon Jan 26 18:38:31 2015
@@ -50,7 +50,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorization_9.q.out Mon Jan 26 18:38:31 2015
@@ -46,7 +46,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP, 1)
+ Reducer 2 <- Map 1 (GROUP, 2)
#### A masked pattern was here ####
Vertices:
Map 1
Modified: hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/spark/vectorized_shufflejoin.q.out Mon Jan 26 18:38:31 2015
@@ -14,7 +14,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1)
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
Reducer 3 <- Reducer 2 (GROUP, 1)
#### A masked pattern was here ####
Vertices:
Modified: hive/trunk/spark-client/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/pom.xml?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/pom.xml (original)
+++ hive/trunk/spark-client/pom.xml Mon Jan 26 18:38:31 2015
@@ -65,6 +65,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java Mon Jan 26 18:38:31 2015
@@ -121,6 +121,20 @@ abstract class BaseProtocol extends RpcD
}
+ protected static class JobStarted implements Serializable {
+
+ final String id;
+
+ JobStarted(String id) {
+ this.id = id;
+ }
+
+ JobStarted() {
+ this(null);
+ }
+
+ }
+
/**
* Inform the client that a new spark job has been submitted for the client job.
*/
@@ -138,4 +152,18 @@ abstract class BaseProtocol extends RpcD
}
}
+ protected static class SyncJobRequest<T extends Serializable> implements Serializable {
+
+ final Job<T> job;
+
+ SyncJobRequest(Job<T> job) {
+ this.job = job;
+ }
+
+ SyncJobRequest() {
+ this(null);
+ }
+
+ }
+
}
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java Mon Jan 26 18:38:31 2015
@@ -55,4 +55,53 @@ public interface JobHandle<T extends Ser
*/
SparkCounters getSparkCounters();
+ /**
+ * Return the current state of the job.
+ */
+ State getState();
+
+ /**
+ * Add a listener to the job handle. If the job's state is not SENT, a callback for the
+ * corresponding state will be invoked immediately.
+ *
+ * @param l The listener to add.
+ */
+ void addListener(Listener<T> l);
+
+ /**
+ * The current state of the submitted job.
+ */
+ static enum State {
+ SENT,
+ QUEUED,
+ STARTED,
+ CANCELLED,
+ FAILED,
+ SUCCEEDED;
+ }
+
+ /**
+ * A listener for monitoring the state of the job in the remote context. Callbacks are called
+ * when the corresponding state change occurs.
+ */
+ static interface Listener<T extends Serializable> {
+
+ void onJobQueued(JobHandle<T> job);
+
+ void onJobStarted(JobHandle<T> job);
+
+ void onJobCancelled(JobHandle<T> job);
+
+ void onJobFailed(JobHandle<T> job, Throwable cause);
+
+ void onJobSucceeded(JobHandle<T> job, T result);
+
+ /**
+ * Called when a monitored Spark job is started on the remote context. This callback
+ * does not indicate a state change in the client job's status.
+ */
+ void onSparkJobStarted(JobHandle<T> job, int sparkJobId);
+
+ }
+
}
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java Mon Jan 26 18:38:31 2015
@@ -17,15 +17,16 @@
package org.apache.hive.spark.client;
-import io.netty.util.concurrent.Promise;
-
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import io.netty.util.concurrent.Promise;
import org.apache.hive.spark.counter.SparkCounters;
@@ -34,28 +35,30 @@ import org.apache.hive.spark.counter.Spa
*/
class JobHandleImpl<T extends Serializable> implements JobHandle<T> {
- private final AtomicBoolean cancelled;
private final SparkClientImpl client;
private final String jobId;
private final MetricsCollection metrics;
private final Promise<T> promise;
private final List<Integer> sparkJobIds;
+ private final List<Listener> listeners;
+ private volatile State state;
private volatile SparkCounters sparkCounters;
JobHandleImpl(SparkClientImpl client, Promise<T> promise, String jobId) {
- this.cancelled = new AtomicBoolean();
this.client = client;
this.jobId = jobId;
this.promise = promise;
+ this.listeners = Lists.newLinkedList();
this.metrics = new MetricsCollection();
this.sparkJobIds = new CopyOnWriteArrayList<Integer>();
+ this.state = State.SENT;
this.sparkCounters = null;
}
/** Requests a running job to be cancelled. */
@Override
public boolean cancel(boolean mayInterrupt) {
- if (cancelled.compareAndSet(false, true)) {
+ if (changeState(State.CANCELLED)) {
client.cancel(jobId);
promise.cancel(mayInterrupt);
return true;
@@ -114,20 +117,116 @@ class JobHandleImpl<T extends Serializab
return sparkCounters;
}
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void addListener(Listener l) {
+ synchronized (listeners) {
+ listeners.add(l);
+ // If current state is a final state, notify of Spark job IDs before notifying about the
+ // state transition.
+ if (state.ordinal() >= State.CANCELLED.ordinal()) {
+ for (Integer i : sparkJobIds) {
+ l.onSparkJobStarted(this, i);
+ }
+ }
+
+ fireStateChange(state, l);
+
+ // Otherwise, notify about Spark jobs after the state notification.
+ if (state.ordinal() < State.CANCELLED.ordinal()) {
+ for (Integer i : sparkJobIds) {
+ l.onSparkJobStarted(this, i);
+ }
+ }
+ }
+ }
+
public void setSparkCounters(SparkCounters sparkCounters) {
this.sparkCounters = sparkCounters;
}
@SuppressWarnings("unchecked")
void setSuccess(Object result) {
- promise.setSuccess((T) result);
+ // The synchronization here is not necessary, but tests depend on it.
+ synchronized (listeners) {
+ promise.setSuccess((T) result);
+ changeState(State.SUCCEEDED);
+ }
}
void setFailure(Throwable error) {
- promise.setFailure(error);
+ // The synchronization here is not necessary, but tests depend on it.
+ synchronized (listeners) {
+ promise.setFailure(error);
+ changeState(State.FAILED);
+ }
+ }
+
+ /**
+ * Changes the state of this job handle, making sure that illegal state transitions are ignored.
+ * Fires events appropriately.
+ *
+ * As a rule, state transitions can only occur if the current state is "higher" than the current
+ * state (i.e., has a higher ordinal number) and is not a "final" state. "Final" states are
+ * CANCELLED, FAILED and SUCCEEDED, defined here in the code as having an ordinal number higher
+ * than the CANCELLED enum constant.
+ */
+ boolean changeState(State newState) {
+ synchronized (listeners) {
+ if (newState.ordinal() > state.ordinal() && state.ordinal() < State.CANCELLED.ordinal()) {
+ state = newState;
+ for (Listener l : listeners) {
+ fireStateChange(newState, l);
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ void addSparkJobId(int sparkJobId) {
+ synchronized (listeners) {
+ sparkJobIds.add(sparkJobId);
+ for (Listener l : listeners) {
+ l.onSparkJobStarted(this, sparkJobId);
+ }
+ }
+ }
+
+ private void fireStateChange(State s, Listener l) {
+ switch (s) {
+ case SENT:
+ break;
+ case QUEUED:
+ l.onJobQueued(this);
+ break;
+ case STARTED:
+ l.onJobStarted(this);
+ break;
+ case CANCELLED:
+ l.onJobCancelled(this);
+ break;
+ case FAILED:
+ l.onJobFailed(this, promise.cause());
+ break;
+ case SUCCEEDED:
+ try {
+ l.onJobSucceeded(this, promise.get());
+ } catch (Exception e) {
+ // Shouldn't really happen.
+ throw new IllegalStateException(e);
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
}
- /** Last attempt resort at preventing stray jobs from accumulating in SparkClientImpl. */
+ /** Last attempt at preventing stray jobs from accumulating in SparkClientImpl. */
@Override
protected void finalize() {
if (!isDone()) {
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java Mon Jan 26 18:38:31 2015
@@ -77,9 +77,9 @@ public class RemoteDriver {
private static final Logger LOG = LoggerFactory.getLogger(RemoteDriver.class);
private final Map<String, JobWrapper<?>> activeJobs;
+ private final Object jcLock;
private final Object shutdownLock;
private final ExecutorService executor;
- private final JobContextImpl jc;
private final NioEventLoopGroup egroup;
private final Rpc clientRpc;
private final DriverProtocol protocol;
@@ -87,10 +87,14 @@ public class RemoteDriver {
// Used to queue up requests while the SparkContext is being created.
private final List<JobWrapper<?>> jobQueue = Lists.newLinkedList();
- private boolean running;
+ // jc is effectively final, but it has to be volatile since it's accessed by different
+ // threads while the constructor is running.
+ private volatile JobContextImpl jc;
+ private volatile boolean running;
private RemoteDriver(String[] args) throws Exception {
this.activeJobs = Maps.newConcurrentMap();
+ this.jcLock = new Object();
this.shutdownLock = new Object();
SparkConf conf = new SparkConf();
@@ -120,6 +124,7 @@ public class RemoteDriver {
Map<String, String> mapConf = Maps.newHashMap();
for (Tuple2<String, String> e : conf.getAll()) {
mapConf.put(e._1(), e._2());
+ LOG.debug("Remote Driver configured with: " + e._1() + "=" + e._2());
}
String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET);
@@ -150,14 +155,20 @@ public class RemoteDriver {
try {
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(new ClientListener());
- jc = new JobContextImpl(sc);
+ synchronized (jcLock) {
+ jc = new JobContextImpl(sc);
+ jcLock.notifyAll();
+ }
} catch (Exception e) {
LOG.error("Failed to start SparkContext.", e);
shutdown(e);
+ synchronized (jcLock) {
+ jcLock.notifyAll();
+ }
throw e;
}
- synchronized (jobQueue) {
+ synchronized (jcLock) {
for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) {
it.next().submit();
}
@@ -174,7 +185,7 @@ public class RemoteDriver {
}
private void submit(JobWrapper<?> job) {
- synchronized (jobQueue) {
+ synchronized (jcLock) {
if (jc != null) {
job.submit();
} else {
@@ -235,6 +246,10 @@ public class RemoteDriver {
clientRpc.call(new JobResult(jobId, result, error, counters));
}
+ void jobStarted(String jobId) {
+ clientRpc.call(new JobStarted(jobId));
+ }
+
void jobSubmitted(String jobId, int sparkJobId) {
LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId);
clientRpc.call(new JobSubmitted(jobId, sparkJobId));
@@ -264,6 +279,35 @@ public class RemoteDriver {
submit(wrapper);
}
+ private Object handle(ChannelHandlerContext ctx, SyncJobRequest msg) throws Exception {
+ // In case the job context is not up yet, let's wait, since this is supposed to be a
+ // "synchronous" RPC.
+ if (jc == null) {
+ synchronized (jcLock) {
+ while (jc == null) {
+ jcLock.wait();
+ if (!running) {
+ throw new IllegalStateException("Remote context is shutting down.");
+ }
+ }
+ }
+ }
+
+ jc.setMonitorCb(new MonitorCallback() {
+ @Override
+ public void call(JavaFutureAction<?> future,
+ SparkCounters sparkCounters, Set<Integer> cachedRDDIds) {
+ throw new IllegalStateException(
+ "JobContext.monitor() is not available for synchronous jobs.");
+ }
+ });
+ try {
+ return msg.job.call(jc);
+ } finally {
+ jc.setMonitorCb(null);
+ }
+ }
+
}
private class JobWrapper<T extends Serializable> implements Callable<Void> {
@@ -286,6 +330,8 @@ public class RemoteDriver {
@Override
public Void call() throws Exception {
+ protocol.jobStarted(req.id);
+
try {
jc.setMonitorCb(new MonitorCallback() {
@Override
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java Mon Jan 26 18:38:31 2015
@@ -38,6 +38,23 @@ public interface SparkClient extends Ser
<T extends Serializable> JobHandle<T> submit(Job<T> job);
/**
+ * Asks the remote context to run a job immediately.
+ * <p/>
+ * Normally, the remote context will queue jobs and execute them based on how many worker
+ * threads have been configured. This method will run the submitted job in the same thread
+ * processing the RPC message, so that queueing does not apply.
+ * <p/>
+ * It's recommended that this method only be used to run code that finishes quickly. This
+ * avoids interfering with the normal operation of the context.
+ * <p/>
+ * Note: the {@link JobContext#monitor()} functionality is not available when using this method.
+ *
+ * @param job The job to execute.
+ * @return A future to monitor the result of the job.
+ */
+ <T extends Serializable> Future<T> run(Job<T> job);
+
+ /**
* Stops the remote context.
*
* Any pending jobs will be cancelled, and the remote context will be torn down.
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java Mon Jan 26 18:38:31 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.apache.spark.SparkException;
@@ -67,12 +68,13 @@ public final class SparkClientFactory {
/**
* Instantiates a new Spark client.
*
- * @param conf Configuration for the remote Spark application.
+ * @param sparkConf Configuration for the remote Spark application, contains spark.* properties.
+ * @param hiveConf Configuration for Hive, contains hive.* properties.
*/
- public static synchronized SparkClient createClient(Map<String, String> conf)
+ public static synchronized SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf)
throws IOException, SparkException {
Preconditions.checkState(server != null, "initialize() not called.");
- return new SparkClientImpl(server, conf);
+ return new SparkClientImpl(server, sparkConf, hiveConf);
}
}
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java Mon Jan 26 18:38:31 2015
@@ -40,7 +40,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.client.rpc.Rpc;
+import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.hive.spark.client.rpc.RpcServer;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkException;
@@ -67,6 +69,7 @@ class SparkClientImpl implements SparkCl
private static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
private final Map<String, String> conf;
+ private final HiveConf hiveConf;
private final AtomicInteger childIdGenerator;
private final Thread driverThread;
private final Map<String, JobHandleImpl<?>> jobs;
@@ -74,8 +77,9 @@ class SparkClientImpl implements SparkCl
private final ClientProtocol protocol;
private volatile boolean isAlive;
- SparkClientImpl(RpcServer rpcServer, Map<String, String> conf) throws IOException, SparkException {
+ SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf) throws IOException, SparkException {
this.conf = conf;
+ this.hiveConf = hiveConf;
this.childIdGenerator = new AtomicInteger();
this.jobs = Maps.newConcurrentMap();
@@ -116,13 +120,16 @@ class SparkClientImpl implements SparkCl
}
@Override
+ public <T extends Serializable> Future<T> run(Job<T> job) {
+ return protocol.run(job);
+ }
+
+ @Override
public void stop() {
if (isAlive) {
isAlive = false;
try {
- protocol.endSession().get(10, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- LOG.warn("Timed out waiting for driver to respond to stop request.");
+ protocol.endSession();
} catch (Exception e) {
LOG.warn("Exception while waiting for end session reply.", e);
} finally {
@@ -137,29 +144,29 @@ class SparkClientImpl implements SparkCl
LOG.debug("Interrupted before driver thread was finished.");
}
if (endTime - System.currentTimeMillis() <= 0) {
- LOG.debug("Shut down time out.");
+ LOG.warn("Timed out shutting down remote driver, interrupting...");
driverThread.interrupt();
}
}
@Override
public Future<?> addJar(URL url) {
- return submit(new AddJarJob(url.toString()));
+ return run(new AddJarJob(url.toString()));
}
@Override
public Future<?> addFile(URL url) {
- return submit(new AddFileJob(url.toString()));
+ return run(new AddFileJob(url.toString()));
}
@Override
public Future<Integer> getExecutorCount() {
- return submit(new GetExecutorCountJob());
+ return run(new GetExecutorCountJob());
}
@Override
public Future<Integer> getDefaultParallelism() {
- return submit(new GetDefaultParallelismJob());
+ return run(new GetDefaultParallelismJob());
}
void cancel(String jobId) {
@@ -296,6 +303,25 @@ class SparkClientImpl implements SparkCl
argv.add("org.apache.spark.deploy.SparkSubmit");
}
+ if (master.equals("yarn-cluster")) {
+ String executorCores = conf.get("spark.executor.cores");
+ if (executorCores != null) {
+ argv.add("--executor-cores");
+ argv.add(executorCores);
+ }
+
+ String executorMemory = conf.get("spark.executor.memory");
+ if (executorMemory != null) {
+ argv.add("--executor-memory");
+ argv.add(executorMemory);
+ }
+
+ String numOfExecutors = conf.get("spark.executor.instances");
+ if (numOfExecutors != null) {
+ argv.add("--num-executors");
+ argv.add(numOfExecutors);
+ }
+ }
argv.add("--properties-file");
argv.add(properties.getAbsolutePath());
@@ -313,6 +339,14 @@ class SparkClientImpl implements SparkCl
argv.add("--remote-port");
argv.add(serverPort);
+ //hive.spark.* keys are passed down to the RemoteDriver via --conf,
+ //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+ for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+ String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+ argv.add("--conf");
+ argv.add(String.format("%s=%s", hiveSparkConfKey, value));
+ }
+
LOG.debug("Running client driver with argv: {}", Joiner.on(" ").join(argv));
ProcessBuilder pb = new ProcessBuilder(argv.toArray(new String[argv.size()]));
@@ -360,7 +394,7 @@ class SparkClientImpl implements SparkCl
<T extends Serializable> JobHandleImpl<T> submit(Job<T> job) {
final String jobId = UUID.randomUUID().toString();
final Promise<T> promise = driverRpc.createPromise();
- JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
+ final JobHandleImpl<T> handle = new JobHandleImpl<T>(SparkClientImpl.this, promise, jobId);
jobs.put(jobId, handle);
final io.netty.util.concurrent.Future<Void> rpc = driverRpc.call(new JobRequest(jobId, job));
@@ -371,7 +405,9 @@ class SparkClientImpl implements SparkCl
rpc.addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Void>>() {
@Override
public void operationComplete(io.netty.util.concurrent.Future<Void> f) {
- if (!f.isSuccess() && !promise.isDone()) {
+ if (f.isSuccess()) {
+ handle.changeState(JobHandle.State.QUEUED);
+ } else if (!promise.isDone()) {
promise.setFailure(f.cause());
}
}
@@ -379,16 +415,24 @@ class SparkClientImpl implements SparkCl
promise.addListener(new GenericFutureListener<Promise<T>>() {
@Override
public void operationComplete(Promise<T> p) {
- jobs.remove(jobId);
+ if (jobId != null) {
+ jobs.remove(jobId);
+ }
if (p.isCancelled() && !rpc.isDone()) {
rpc.cancel(true);
}
}
});
-
return handle;
}
+ <T extends Serializable> Future<T> run(Job<T> job) {
+ @SuppressWarnings("unchecked")
+ final io.netty.util.concurrent.Future<T> rpc = (io.netty.util.concurrent.Future<T>)
+ driverRpc.call(new SyncJobRequest(job), Serializable.class);
+ return rpc;
+ }
+
void cancel(String jobId) {
driverRpc.call(new CancelJob(jobId));
}
@@ -426,11 +470,20 @@ class SparkClientImpl implements SparkCl
}
}
+ private void handle(ChannelHandlerContext ctx, JobStarted msg) {
+ JobHandleImpl<?> handle = jobs.get(msg.id);
+ if (handle != null) {
+ handle.changeState(JobHandle.State.STARTED);
+ } else {
+ LOG.warn("Received event for unknown job {}", msg.id);
+ }
+ }
+
private void handle(ChannelHandlerContext ctx, JobSubmitted msg) {
JobHandleImpl<?> handle = jobs.get(msg.clientJobId);
if (handle != null) {
LOG.info("Received spark job ID: {} for {}", msg.sparkJobId, msg.clientJobId);
- handle.getSparkJobIds().add(msg.sparkJobId);
+ handle.addSparkJobId(msg.sparkJobId);
} else {
LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId);
}
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java Mon Jan 26 18:38:31 2015
@@ -84,7 +84,7 @@ public class Rpc implements Closeable {
final String secret,
final RpcDispatcher dispatcher) throws Exception {
final RpcConfiguration rpcConf = new RpcConfiguration(config);
- int connectTimeoutMs = rpcConf.getConnectTimeoutMs();
+ int connectTimeoutMs = (int) rpcConf.getConnectTimeoutMs();
final ChannelFuture cf = new Bootstrap()
.group(eloop)
Modified: hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java (original)
+++ hive/trunk/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java Mon Jan 26 18:38:31 2015
@@ -21,9 +21,14 @@ import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
+import java.util.Arrays;
import java.util.Enumeration;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,63 +42,49 @@ public final class RpcConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class);
- /** Connection timeout for RPC clients. */
- public static final String CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.connect.timeout.ms";
- private static final int CONNECT_TIMEOUT_MS_DEFAULT = 1000;
+ public static final ImmutableSet<String> HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of(
+ HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+ HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname,
+ HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
+ HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
+ HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
+ HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+ );
+ public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
+ HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
+ HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
+ );
- /**
- * How long the server should wait for clients to connect back after they're
- * registered. Also used to time out the client waiting for the server to
- * reply to its "hello" message.
- */
- public static final String SERVER_CONNECT_TIMEOUT_MS_KEY = "hive.spark.client.server.connect.timeout.ms";
- private static final long SERVER_CONNECT_TIMEOUT_MS_DEFAULT = 10000L;
-
- /**
- * Number of bits of randomness in the generated client secrets. Rounded down
- * to the nearest multiple of 8.
- */
- public static final String SECRET_RANDOM_BITS_KEY = "hive.spark.client.secret.bits";
- private static final int SECRET_RANDOM_BITS_DEFAULT = 256;
-
- /** Hostname or IP address to advertise for the server. */
public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
- /** Maximum number of threads to use for the RPC event loop. */
- public static final String RPC_MAX_THREADS_KEY = "hive.spark.client.rpc.threads";
- public static final int RPC_MAX_THREADS_DEFAULT = 8;
-
- /** Maximum message size. Default = 10MB. */
- public static final String RPC_MAX_MESSAGE_SIZE_KEY = "hive.spark.client.rpc.max.size";
- public static final int RPC_MAX_MESSAGE_SIZE_DEFAULT = 50 * 1024 * 1024;
-
- /** Channel logging level. */
- public static final String RPC_CHANNEL_LOG_LEVEL_KEY = "hive.spark.client.channel.log.level";
-
private final Map<String, String> config;
+ private static final HiveConf DEFAULT_CONF = new HiveConf();
+
public RpcConfiguration(Map<String, String> config) {
this.config = config;
}
- int getConnectTimeoutMs() {
- String value = config.get(CONNECT_TIMEOUT_MS_KEY);
- return value != null ? Integer.parseInt(value) : CONNECT_TIMEOUT_MS_DEFAULT;
+ long getConnectTimeoutMs() {
+ String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname);
+ return value != null ? Integer.parseInt(value) : DEFAULT_CONF.getTimeVar(
+ HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS);
}
int getMaxMessageSize() {
- String value = config.get(RPC_MAX_MESSAGE_SIZE_KEY);
- return value != null ? Integer.parseInt(value) : RPC_MAX_MESSAGE_SIZE_DEFAULT;
+ String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname);
+ return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.defaultIntVal;
}
long getServerConnectTimeoutMs() {
- String value = config.get(SERVER_CONNECT_TIMEOUT_MS_KEY);
- return value != null ? Long.parseLong(value) : SERVER_CONNECT_TIMEOUT_MS_DEFAULT;
+ String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname);
+ return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(
+ HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT, TimeUnit.MILLISECONDS);
}
int getSecretBits() {
- String value = config.get(SECRET_RANDOM_BITS_KEY);
- return value != null ? Integer.parseInt(value) : SECRET_RANDOM_BITS_DEFAULT;
+ String value = config.get(HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname);
+ return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
}
String getServerAddress() throws IOException {
@@ -133,12 +124,28 @@ public final class RpcConfiguration {
}
String getRpcChannelLogLevel() {
- return config.get(RPC_CHANNEL_LOG_LEVEL_KEY);
+ return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname);
}
public int getRpcThreadCount() {
- String value = config.get(RPC_MAX_THREADS_KEY);
- return value != null ? Integer.parseInt(value) : RPC_MAX_THREADS_DEFAULT;
+ String value = config.get(HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname);
+ return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.defaultIntVal;
}
+
+ /**
+ * Utility method for a given RpcConfiguration key, to convert value to millisecond if it is a time value,
+ * and return as string in either case.
+ * @param conf hive configuration
+ * @param key Rpc configuration to lookup (hive.spark.*)
+ * @return string form of the value
+ */
+ public static String getValue(HiveConf conf, String key) {
+ if (HIVE_SPARK_TIME_CONFIGS.contains(key)) {
+ HiveConf.ConfVars confVar = HiveConf.getConfVars(key);
+ return String.valueOf(conf.getTimeVar(confVar, TimeUnit.MILLISECONDS));
+ } else {
+ return conf.get(key);
+ }
+ }
}
Modified: hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java (original)
+++ hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java Mon Jan 26 18:38:31 2015
@@ -17,40 +17,41 @@
package org.apache.hive.spark.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
+import java.io.Serializable;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+import com.google.common.io.ByteStreams;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.spark.counter.SparkCounters;
+import org.apache.spark.SparkException;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.junit.Test;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Strings;
-import com.google.common.io.ByteStreams;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestSparkClient {
// Timeouts are bad... mmmkay.
private static final long TIMEOUT = 20;
+ private static final HiveConf HIVECONF = new HiveConf();
private Map<String, String> createConf(boolean local) {
Map<String, String> conf = new HashMap<String, String>();
@@ -78,8 +79,19 @@ public class TestSparkClient {
runTest(true, new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
+ JobHandle.Listener<String> listener = newListener();
JobHandle<String> handle = client.submit(new SimpleJob());
+ handle.addListener(listener);
assertEquals("hello", handle.get(TIMEOUT, TimeUnit.SECONDS));
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobQueued(handle);
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobSucceeded(same(handle), eq(handle.get()));
}
});
}
@@ -100,12 +112,36 @@ public class TestSparkClient {
runTest(true, new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
- JobHandle<String> handle = client.submit(new SimpleJob());
+ JobHandle.Listener<String> listener = newListener();
+ JobHandle<String> handle = client.submit(new ErrorJob());
+ handle.addListener(listener);
try {
handle.get(TIMEOUT, TimeUnit.SECONDS);
+ fail("Should have thrown an exception.");
} catch (ExecutionException ee) {
- assertTrue(ee.getCause() instanceof IllegalStateException);
+ assertTrue(ee.getCause() instanceof SparkException);
+ assertTrue(ee.getCause().getMessage().contains("IllegalStateException: Hello"));
}
+
+ // Try an invalid state transition on the handle. This ensures that the actual state
+ // change we're interested in actually happened, since internally the handle serializes
+ // state changes.
+ assertFalse(((JobHandleImpl<String>)handle).changeState(JobHandle.State.SENT));
+
+ verify(listener).onJobQueued(handle);
+ verify(listener).onJobStarted(handle);
+ verify(listener).onJobFailed(same(handle), any(Throwable.class));
+ }
+ });
+ }
+
+ @Test
+ public void testSyncRpc() throws Exception {
+ runTest(true, new TestFunction() {
+ @Override
+ public void call(SparkClient client) throws Exception {
+ Future<String> result = client.run(new SyncRpc());
+ assertEquals("Hello", result.get(TIMEOUT, TimeUnit.SECONDS));
}
});
}
@@ -126,18 +162,26 @@ public class TestSparkClient {
runTest(true, new TestFunction() {
@Override
public void call(SparkClient client) throws Exception {
+ JobHandle.Listener<Integer> listener = newListener();
JobHandle<Integer> future = client.submit(new AsyncSparkJob());
+ future.addListener(listener);
future.get(TIMEOUT, TimeUnit.SECONDS);
MetricsCollection metrics = future.getMetrics();
assertEquals(1, metrics.getJobIds().size());
assertTrue(metrics.getAllMetrics().executorRunTime > 0L);
+ verify(listener).onSparkJobStarted(same(future),
+ eq(metrics.getJobIds().iterator().next()));
+ JobHandle.Listener<Integer> listener2 = newListener();
JobHandle<Integer> future2 = client.submit(new AsyncSparkJob());
+ future2.addListener(listener2);
future2.get(TIMEOUT, TimeUnit.SECONDS);
MetricsCollection metrics2 = future2.getMetrics();
assertEquals(1, metrics2.getJobIds().size());
assertFalse(Objects.equal(metrics.getJobIds(), metrics2.getJobIds()));
assertTrue(metrics2.getAllMetrics().executorRunTime > 0L);
+ verify(listener2).onSparkJobStarted(same(future2),
+ eq(metrics2.getJobIds().iterator().next()));
}
});
}
@@ -214,13 +258,20 @@ public class TestSparkClient {
});
}
+ private <T extends Serializable> JobHandle.Listener<T> newListener() {
+ @SuppressWarnings("unchecked")
+ JobHandle.Listener<T> listener =
+ (JobHandle.Listener<T>) mock(JobHandle.Listener.class);
+ return listener;
+ }
+
private void runTest(boolean local, TestFunction test) throws Exception {
Map<String, String> conf = createConf(local);
SparkClientFactory.initialize(conf);
SparkClient client = null;
try {
test.config(conf);
- client = SparkClientFactory.createClient(conf);
+ client = SparkClientFactory.createClient(conf, HIVECONF);
test.call(client);
} finally {
if (client != null) {
@@ -239,6 +290,15 @@ public class TestSparkClient {
}
+ private static class ErrorJob implements Job<String> {
+
+ @Override
+ public String call(JobContext jc) {
+ throw new IllegalStateException("Hello");
+ }
+
+ }
+
private static class SparkJob implements Job<Long> {
@Override
@@ -332,6 +392,15 @@ public class TestSparkClient {
}
}
+
+ private static class SyncRpc implements Job<String> {
+
+ @Override
+ public String call(JobContext jc) {
+ return "Hello";
+ }
+
+ }
private abstract static class TestFunction {
abstract void call(SparkClient client) throws Exception;
Modified: hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
URL: http://svn.apache.org/viewvc/hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java?rev=1654861&r1=1654860&r2=1654861&view=diff
==============================================================================
--- hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java (original)
+++ hive/trunk/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java Mon Jan 26 18:38:31 2015
@@ -32,6 +32,7 @@ import io.netty.channel.embedded.Embedde
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@ public class TestRpc {
private Collection<Closeable> closeables;
private Map<String, String> emptyConfig =
- ImmutableMap.of(RpcConfiguration.RPC_CHANNEL_LOG_LEVEL_KEY, "DEBUG");
+ ImmutableMap.of(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, "DEBUG");
@Before
public void setUp() {