You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/02/16 20:32:36 UTC

spark git commit: SPARK-5795 [STREAMING] api.java.JavaPairDStream.saveAsNewAPIHadoopFiles may not friendly to java

Repository: spark
Updated Branches:
  refs/heads/master 9baac56cc -> 8e25373ce


SPARK-5795 [STREAMING] api.java.JavaPairDStream.saveAsNewAPIHadoopFiles may not friendly to java

Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended.

CC tdas for review

Author: Sean Owen <so...@cloudera.com>

Closes #4608 from srowen/SPARK-5795 and squashes the following commits:

36f1ead [Sean Owen] Add code that shows compile problem and fix
036bd27 [Sean Owen] Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e25373c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e25373c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e25373c

Branch: refs/heads/master
Commit: 8e25373ce72061d3b6a353259ec627606afa4a5f
Parents: 9baac56
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Feb 16 19:32:31 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 16 19:32:31 2015 +0000

----------------------------------------------------------------------
 .../streaming/api/java/JavaPairDStream.scala    | 20 ++++++++++----------
 .../apache/spark/streaming/JavaAPISuite.java    | 18 ++++++++++++++++++
 2 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e25373c/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index de124cf..bd01789 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -726,7 +726,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
+  def saveAsHadoopFiles(prefix: String, suffix: String) {
     dstream.saveAsHadoopFiles(prefix, suffix)
   }
 
@@ -734,12 +734,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsHadoopFiles(
+  def saveAsHadoopFiles[F <: OutputFormat[_, _]](
       prefix: String,
       suffix: String,
       keyClass: Class[_],
       valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+      outputFormatClass: Class[F]) {
     dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
   }
 
@@ -747,12 +747,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsHadoopFiles(
+  def saveAsHadoopFiles[F <: OutputFormat[_, _]](
       prefix: String,
       suffix: String,
       keyClass: Class[_],
       valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      outputFormatClass: Class[F],
       conf: JobConf) {
     dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
   }
@@ -761,7 +761,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
+  def saveAsNewAPIHadoopFiles(prefix: String, suffix: String) {
     dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
   }
 
@@ -769,12 +769,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsNewAPIHadoopFiles(
+  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
       prefix: String,
       suffix: String,
       keyClass: Class[_],
       valueClass: Class[_],
-      outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+      outputFormatClass: Class[F]) {
     dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
   }
 
@@ -782,12 +782,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
    * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
    */
-  def saveAsNewAPIHadoopFiles(
+  def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
       prefix: String,
       suffix: String,
       keyClass: Class[_],
       valueClass: Class[_],
-      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+      outputFormatClass: Class[F],
       conf: Configuration = new Configuration) {
     dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e25373c/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2df8cf6..57302ff 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1828,4 +1828,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
 
     return expected;
   }
+
+  // SPARK-5795: no logic assertions, just testing that intended API invocations compile
+  private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
+    pds.saveAsNewAPIHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+    pds.saveAsHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+    // Checks that a previous common workaround for this API still compiles
+    pds.saveAsNewAPIHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+    pds.saveAsHadoopFiles(
+        "", "", LongWritable.class, Text.class,
+        (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org