You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/04/21 12:03:23 UTC

spark git commit: [SPARK-8393][STREAMING] JavaStreamingContext#awaitTermination() throws non-declared InterruptedException

Repository: spark
Updated Branches:
  refs/heads/master cb51680d2 -> 8bd05c9db


[SPARK-8393][STREAMING] JavaStreamingContext#awaitTermination() throws non-declared InterruptedException

## What changes were proposed in this pull request?

`JavaStreamingContext.awaitTermination` methods should be declared as `throws[InterruptedException]` so that this exception can be handled in Java code. Note this is not just a doc change, but an API change, since now (in Java) the method has a checked exception to handle. All await-like methods in Java APIs behave this way, so seems worthwhile for 2.0.

## How was this patch tested?

Jenkins tests

Author: Sean Owen <so...@cloudera.com>

Closes #12418 from srowen/SPARK-8393.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bd05c9d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bd05c9d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bd05c9d

Branch: refs/heads/master
Commit: 8bd05c9db2e9c1c77fd06d490e5d4136acd6821c
Parents: cb51680
Author: Sean Owen <so...@cloudera.com>
Authored: Thu Apr 21 11:03:16 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Apr 21 11:03:16 2016 +0100

----------------------------------------------------------------------
 .../spark/examples/mllib/JavaStreamingTestExample.java       | 2 +-
 .../apache/spark/examples/streaming/JavaCustomReceiver.java  | 2 +-
 .../spark/examples/streaming/JavaDirectKafkaWordCount.java   | 8 +++++---
 .../apache/spark/examples/streaming/JavaFlumeEventCount.java | 2 +-
 .../apache/spark/examples/streaming/JavaKafkaWordCount.java  | 2 +-
 .../spark/examples/streaming/JavaNetworkWordCount.java       | 2 +-
 .../examples/streaming/JavaRecoverableNetworkWordCount.java  | 2 +-
 .../spark/examples/streaming/JavaSqlNetworkWordCount.java    | 2 +-
 .../examples/streaming/JavaStatefulNetworkWordCount.java     | 2 +-
 .../spark/examples/streaming/JavaKinesisWordCountASL.java    | 4 +---
 .../spark/streaming/api/java/JavaStreamingContext.scala      | 2 ++
 11 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index 984909c..df90199 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -58,7 +58,7 @@ public class JavaStreamingTestExample {
 
   private static int timeoutCounter = 0;
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length != 3) {
       System.err.println("Usage: JavaStreamingTestExample " +
         "<dataDir> <batchDuration> <numBatchesTimeout>");

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 4544ad2..1cba565 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -58,7 +58,7 @@ import java.util.regex.Pattern;
 public class JavaCustomReceiver extends Receiver<String> {
   private static final Pattern SPACE = Pattern.compile(" ");
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: JavaCustomReceiver <hostname> <port>");
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index 769b21c..ed118f8 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import scala.Tuple2;
@@ -47,7 +49,7 @@ import org.apache.spark.streaming.Durations;
 public final class JavaDirectKafkaWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
           "  <brokers> is a list of one or more Kafka brokers\n" +
@@ -64,8 +66,8 @@ public final class JavaDirectKafkaWordCount {
     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
 
-    HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
-    HashMap<String, String> kafkaParams = new HashMap<>();
+    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
+    Map<String, String> kafkaParams = new HashMap<>();
     kafkaParams.put("metadata.broker.list", brokers);
 
     // Create direct kafka stream with brokers and topics

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index bae4b78..33c0a2d 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -43,7 +43,7 @@ public final class JavaFlumeEventCount {
   private JavaFlumeEventCount() {
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length != 2) {
       System.err.println("Usage: JavaFlumeEventCount <host> <port>");
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 655da68..8a5fd53 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -57,7 +57,7 @@ public final class JavaKafkaWordCount {
   private JavaKafkaWordCount() {
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length < 4) {
       System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 5761da6..7a8fe99 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -48,7 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 public final class JavaNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index e5fb2bf..0563149 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -183,7 +183,7 @@ public final class JavaRecoverableNetworkWordCount {
     return ssc;
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length != 4) {
       System.err.println("You arguments were " + Arrays.asList(args));
       System.err.println(

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 4b9d9ef..7aa8862 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -53,7 +53,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 public final class JavaSqlNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index 4230dab..ed36df8 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -50,7 +50,7 @@ import org.apache.spark.streaming.api.java.*;
 public class JavaStatefulNetworkWordCount {
   private static final Pattern SPACE = Pattern.compile(" ");
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
----------------------------------------------------------------------
diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 0e43e92..d40bd3f 100644
--- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.regex.Pattern;
 
 import com.amazonaws.regions.RegionUtils;
-import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function2;
@@ -81,9 +80,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
  */
 public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
   private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
-  private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     // Check that all required args were passed in.
     if (args.length != 3) {
       System.err.println(

http://git-wip-us.apache.org/repos/asf/spark/blob/8bd05c9d/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 922e4a5..7e78fa1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -558,6 +558,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * Wait for the execution to stop. Any exceptions that occurs during the execution
    * will be thrown in this thread.
    */
+  @throws[InterruptedException]
   def awaitTermination(): Unit = {
     ssc.awaitTermination()
   }
@@ -570,6 +571,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * @return `true` if it's stopped; or throw the reported error during the execution; or `false`
    *         if the waiting time elapsed before returning from the method.
    */
+  @throws[InterruptedException]
   def awaitTerminationOrTimeout(timeout: Long): Boolean = {
     ssc.awaitTerminationOrTimeout(timeout)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org