You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/31 19:13:18 UTC
[15/16] git commit: Fixed comments and long lines based on comments
on PR 289.
Fixed comments and long lines based on comments on PR 289.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fcd17a1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fcd17a1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fcd17a1e
Branch: refs/heads/master
Commit: fcd17a1e8ef1d0f106e845f4de99533d61cd8695
Parents: 271e323
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Dec 31 02:01:45 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Dec 31 02:01:45 2013 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/rdd/RDDCheckpointData.scala | 3 ++-
.../scala/org/apache/spark/streaming/Checkpoint.scala | 4 ++--
.../org/apache/spark/streaming/StreamingContext.scala | 9 +++++++--
.../streaming/api/java/JavaStreamingContext.scala | 13 ++++++++-----
4 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcd17a1e/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 091a6fd..642daba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -91,7 +91,8 @@ private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
}
// Save to file, and reload it as an RDD
- val broadcastedConf = rdd.context.broadcast(new SerializableWritable(rdd.context.hadoopConfiguration))
+ val broadcastedConf = rdd.context.broadcast(
+ new SerializableWritable(rdd.context.hadoopConfiguration))
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (newRDD.partitions.size != rdd.partitions.size) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcd17a1e/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0f9a719..4960a85 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -81,8 +81,8 @@ class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends
attempts += 1
try {
logDebug("Saving checkpoint for time " + checkpointTime + " to file '" + file + "'")
- // This is inherently thread unsafe .. so alleviating it by writing to '.new' and
- // then doing moves : which should be pretty fast.
+ // This is inherently thread unsafe, so alleviating it by writing to '.new' and
+ // then moving it to the final file
val fos = fs.create(writeFile)
fos.write(bytes)
fos.close()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcd17a1e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6628fdc..8898fdc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -371,7 +371,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
- * File names starting with . are ignored.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -390,6 +391,8 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
@@ -410,7 +413,9 @@ class StreamingContext private (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * as Text and input format as TextInputFormat). Files must be written to the
+ * monitored directory by "moving" them from another location within the same
+ * file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcd17a1e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 78d318c..aad0d93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -256,9 +256,11 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * as Text and input format as TextInputFormat). Files must be written to the
+ * monitored directory by "moving" them from another location within the same
+ * file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): JavaDStream[String] = {
@@ -300,9 +302,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
- * File names starting with . are ignored.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -331,7 +334,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
- * Creates a input stream from a Flume source.
+ * Create a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/