You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/02/16 20:32:36 UTC
spark git commit: SPARK-5795 [STREAMING]
api.java.JavaPairDStream.saveAsNewAPIHadoopFiles may not friendly to java
Repository: spark
Updated Branches:
refs/heads/master 9baac56cc -> 8e25373ce
SPARK-5795 [STREAMING] api.java.JavaPairDStream.saveAsNewAPIHadoopFiles may not friendly to java
Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended.
CC tdas for review
Author: Sean Owen <so...@cloudera.com>
Closes #4608 from srowen/SPARK-5795 and squashes the following commits:
36f1ead [Sean Owen] Add code that shows compile problem and fix
036bd27 [Sean Owen] Revise JavaPairDStream API declaration on saveAs Hadoop methods, to allow it to be called directly as intended.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e25373c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e25373c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e25373c
Branch: refs/heads/master
Commit: 8e25373ce72061d3b6a353259ec627606afa4a5f
Parents: 9baac56
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Feb 16 19:32:31 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 16 19:32:31 2015 +0000
----------------------------------------------------------------------
.../streaming/api/java/JavaPairDStream.scala | 20 ++++++++++----------
.../apache/spark/streaming/JavaAPISuite.java | 18 ++++++++++++++++++
2 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8e25373c/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index de124cf..bd01789 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -726,7 +726,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
+ def saveAsHadoopFiles(prefix: String, suffix: String) {
dstream.saveAsHadoopFiles(prefix, suffix)
}
@@ -734,12 +734,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsHadoopFiles(
+ def saveAsHadoopFiles[F <: OutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+ outputFormatClass: Class[F]) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
@@ -747,12 +747,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsHadoopFiles(
+ def saveAsHadoopFiles[F <: OutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ outputFormatClass: Class[F],
conf: JobConf) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
@@ -761,7 +761,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
+ def saveAsNewAPIHadoopFiles(prefix: String, suffix: String) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
}
@@ -769,12 +769,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsNewAPIHadoopFiles(
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ outputFormatClass: Class[F]) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
@@ -782,12 +782,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
- def saveAsNewAPIHadoopFiles(
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ outputFormatClass: Class[F],
conf: Configuration = new Configuration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8e25373c/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 2df8cf6..57302ff 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1828,4 +1828,22 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return expected;
}
+
+ // SPARK-5795: no logic assertions, just testing that intended API invocations compile
+ private void compileSaveAsJavaAPI(JavaPairDStream<LongWritable,Text> pds) {
+ pds.saveAsNewAPIHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+ pds.saveAsHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+ // Checks that a previous common workaround for this API still compiles
+ pds.saveAsNewAPIHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ (Class) org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
+ pds.saveAsHadoopFiles(
+ "", "", LongWritable.class, Text.class,
+ (Class) org.apache.hadoop.mapred.SequenceFileOutputFormat.class);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org