You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/01/21 08:37:51 UTC

spark git commit: [SPARK-5297][Streaming] Fix Java file stream type erasure problem

Repository: spark
Updated Branches:
  refs/heads/master ec5b0f2ce -> 424d8c6ff


[SPARK-5297][Streaming] Fix Java file stream type erasure problem

Current Java file stream doesn't support custom key/value type because of loss of type information, details can be seen in [SPARK-5297](https://issues.apache.org/jira/browse/SPARK-5297). Fix this problem by getting correct `ClassTag` from `Class[_]`.

Author: jerryshao <sa...@intel.com>

Closes #4101 from jerryshao/SPARK-5297 and squashes the following commits:

e022ca3 [jerryshao] Add Mima exclusion
ecd61b8 [jerryshao] Fix Java fileInputStream type erasure problem


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/424d8c6f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/424d8c6f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/424d8c6f

Branch: refs/heads/master
Commit: 424d8c6ffff42e4231cc1088b7e69e3c0f5e6b56
Parents: ec5b0f2
Author: jerryshao <sa...@intel.com>
Authored: Tue Jan 20 23:37:47 2015 -0800
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Tue Jan 20 23:37:47 2015 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  4 ++
 .../api/java/JavaStreamingContext.scala         | 53 ++++++++++++---
 .../apache/spark/streaming/JavaAPISuite.java    | 70 ++++++++++++++++++--
 3 files changed, 112 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/424d8c6f/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 95fef23..127973b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -86,6 +86,10 @@ object MimaExcludes {
             // SPARK-5270
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.api.java.JavaRDDLike.isEmpty")
+          ) ++ Seq(
+            // SPARK-5297 Java FileStream do not work with custom key/values
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
           )
 
         case v if v.startsWith("1.2") =>

http://git-wip-us.apache.org/repos/asf/spark/blob/424d8c6f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d8695b8..9a2254b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,14 +17,15 @@
 
 package org.apache.spark.streaming.api.java
 
+import java.lang.{Boolean => JBoolean}
+import java.io.{Closeable, InputStream}
+import java.util.{List => JList, Map => JMap}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
-import java.io.{Closeable, InputStream}
-import java.util.{List => JList, Map => JMap}
-
 import akka.actor.{Props, SupervisorStrategy}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark.{SparkConf, SparkContext}
@@ -250,22 +251,54 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
    * Files must be written to the monitored directory by "moving" them from another
    * location within the same file system. File names starting with . are ignored.
    * @param directory HDFS directory to monitor for new file
+   * @param kClass class of key for reading HDFS file
+   * @param vClass class of value for reading HDFS file
+   * @param fClass class of input format for reading HDFS file
    * @tparam K Key type for reading HDFS file
    * @tparam V Value type for reading HDFS file
    * @tparam F Input format for reading HDFS file
    */
   def fileStream[K, V, F <: NewInputFormat[K, V]](
-      directory: String): JavaPairInputDStream[K, V] = {
-    implicit val cmk: ClassTag[K] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
-    implicit val cmv: ClassTag[V] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
-    implicit val cmf: ClassTag[F] =
-      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
+      directory: String,
+      kClass: Class[K],
+      vClass: Class[V],
+      fClass: Class[F]): JavaPairInputDStream[K, V] = {
+    implicit val cmk: ClassTag[K] = ClassTag(kClass)
+    implicit val cmv: ClassTag[V] = ClassTag(vClass)
+    implicit val cmf: ClassTag[F] = ClassTag(fClass)
     ssc.fileStream[K, V, F](directory)
   }
 
   /**
+   * Create an input stream that monitors a Hadoop-compatible filesystem
+   * for new files and reads them using the given key-value types and input format.
+   * Files must be written to the monitored directory by "moving" them from another
+   * location within the same file system. File names starting with . are ignored.
+   * @param directory HDFS directory to monitor for new file
+   * @param kClass class of key for reading HDFS file
+   * @param vClass class of value for reading HDFS file
+   * @param fClass class of input format for reading HDFS file
+   * @param filter Function to filter paths to process
+   * @param newFilesOnly Should process only new files and ignore existing files in the directory
+   * @tparam K Key type for reading HDFS file
+   * @tparam V Value type for reading HDFS file
+   * @tparam F Input format for reading HDFS file
+   */
+  def fileStream[K, V, F <: NewInputFormat[K, V]](
+      directory: String,
+      kClass: Class[K],
+      vClass: Class[V],
+      fClass: Class[F],
+      filter: JFunction[Path, JBoolean],
+      newFilesOnly: Boolean): JavaPairInputDStream[K, V] = {
+    implicit val cmk: ClassTag[K] = ClassTag(kClass)
+    implicit val cmv: ClassTag[V] = ClassTag(vClass)
+    implicit val cmf: ClassTag[F] = ClassTag(fClass)
+    def fn = (x: Path) => filter.call(x).booleanValue()
+    ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
+  }
+
+  /**
    * Create an input stream with any arbitrary user implemented actor receiver.
    * @param props Props object defining creation of the actor
    * @param name Name of the actor

http://git-wip-us.apache.org/repos/asf/spark/blob/424d8c6f/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 12cc0de..d92e7fe 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,13 +17,20 @@
 
 package org.apache.spark.streaming;
 
+import java.io.*;
+import java.lang.Iterable;
+import java.nio.charset.Charset;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import scala.Tuple2;
 
 import org.junit.Assert;
+import static org.junit.Assert.*;
 import org.junit.Test;
-import java.io.*;
-import java.util.*;
-import java.lang.Iterable;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -1743,13 +1750,66 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
       StorageLevel.MEMORY_ONLY());
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testTextFileStream() throws IOException {
+    File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+    List<List<String>> expected = fileTestPrepare(testDir);
+
+    JavaDStream<String> input = ssc.textFileStream(testDir.toString());
+    JavaTestUtils.attachTestOutputStream(input);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expected, result);
+  }
+
+  @SuppressWarnings("unchecked")
   @Test
-  public void testTextFileStream() {
-    JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
+  public void testFileStream() throws IOException {
+    File testDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"));
+    List<List<String>> expected = fileTestPrepare(testDir);
+
+    JavaPairInputDStream<LongWritable, Text> inputStream = ssc.fileStream(
+      testDir.toString(),
+      LongWritable.class,
+      Text.class,
+      TextInputFormat.class,
+      new Function<Path, Boolean>() {
+        @Override
+        public Boolean call(Path v1) throws Exception {
+          return Boolean.TRUE;
+        }
+      },
+      true);
+
+    JavaDStream<String> test = inputStream.map(
+      new Function<Tuple2<LongWritable, Text>, String>() {
+        @Override
+        public String call(Tuple2<LongWritable, Text> v1) throws Exception {
+          return v1._2().toString();
+        }
+    });
+
+    JavaTestUtils.attachTestOutputStream(test);
+    List<List<String>> result = JavaTestUtils.runStreams(ssc, 1, 1);
+
+    assertOrderInvariantEquals(expected, result);
   }
 
   @Test
   public void testRawSocketStream() {
     JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
   }
+
+  private List<List<String>> fileTestPrepare(File testDir) throws IOException {
+    File existingFile = new File(testDir, "0");
+    Files.write("0\n", existingFile, Charset.forName("UTF-8"));
+    assertTrue(existingFile.setLastModified(1000) && existingFile.lastModified() == 1000);
+
+    List<List<String>> expected = Arrays.asList(
+      Arrays.asList("0")
+    );
+
+    return expected;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org