You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/09/23 16:08:44 UTC

spark git commit: [SPARK-20448][DOCS] Document how FileInputDStream works with object storage

Repository: spark
Updated Branches:
  refs/heads/master 04975a68b -> c792aff03


[SPARK-20448][DOCS] Document how FileInputDStream works with object storage

Change-Id: I88c272444ca734dc2cbc2592607c11287b90a383

## What changes were proposed in this pull request?

The documentation on File DStreams is enhanced to

1. Detail the exact timestamp logic for examining directories and files.
1. Detail how object stores different from filesystems, and so how using them as a source of data should be treated with caution, possibly publishing data to the store differently (direct PUTs as opposed to stage + rename)

## How was this patch tested?

n/a

Author: Steve Loughran <st...@hortonworks.com>

Closes #17743 from steveloughran/cloud/SPARK-20448-document-dstream-blobstore.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c792aff0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c792aff0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c792aff0

Branch: refs/heads/master
Commit: c792aff0367f1f9dc7338e0caa52842cf6b78678
Parents: 04975a6
Author: Steve Loughran <st...@hortonworks.com>
Authored: Sat Sep 23 17:08:41 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Sep 23 17:08:41 2017 +0100

----------------------------------------------------------------------
 docs/streaming-programming-guide.md | 120 +++++++++++++++++++++++++------
 1 file changed, 99 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c792aff0/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index bc200cd..868acc4 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -615,35 +615,113 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
+#### File Streams
+{:.no_toc}
 
-    <div class="codetabs">
-    <div data-lang="scala" markdown="1">
-        streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
-    </div>
-    <div data-lang="java" markdown="1">
-		streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
-    </div>
-    <div data-lang="python" markdown="1">
-		streamingContext.textFileStream(dataDirectory)
-    </div>
-    </div>
+For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
 
-	Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
+File streams do not require running a receiver so there is no need to allocate any cores for receiving file data.
 
-     + The files must have the same data format.
-     + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
-     the data directory.
-     + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
+For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`.
 
-	For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
 
-	<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only	`textFileStream` is	available.
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+</div>
 
-- **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
+<div data-lang="java" markdown="1">
+{% highlight java %}
+streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
+{% endhighlight %}
+For text files
+
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+`fileStream` is not available in the Python API; only `textFileStream` is available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+</div>
+
+</div>
+
+##### How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory.
+
+   * A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`.
+     All files directly under such a path will be processed as they are discovered.
+   + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as
+     `"hdfs://namenode:8040/logs/2017/*"`.
+     Here, the DStream will consist of all files in the directories
+     matching the pattern.
+     That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification time,
+     not its creation time.
+   + Once processed, changes to a file within the current window will not cause the file to be reread.
+     That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+     scan for changes — even if no files have been modified.
+   * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`,
+     renaming an entire directory to match the path will add the directory to the list of
+     monitored directories. Only the files in the directory whose modification time is
+     within the current window will be included in the stream.
+   + Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-)
+     to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed.
+
+
+##### Using Object Stores as a source of data
+{:.no_toc}
+
+"Full" Filesystems such as HDFS tend to set the modification time on their files as soon
+as the output stream is created.
+When a file is opened, even before data has been completely written,
+it may be included in the `DStream` - after which updates to the file within the same window
+will be ignored. That is: changes may be missed, and data omitted from the stream.
+
+To guarantee that changes are picked up in a window, write the file
+to an unmonitored directory, then, immediately after the output stream is closed,
+rename it into the destination directory.
+Provided the renamed file appears in the scanned destination directory during the window
+of its creation, the new data will be picked up.
+
+In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the
+data is actually copied.
+Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so
+may not be considered part of the window which the original create time implied they were.
+
+Careful testing is needed against the target object store to verify that the timestamp behavior
+of the store is consistent with that expected by Spark Streaming. It may be
+that writing directly into a destination directory is the appropriate strategy for
+streaming data via the chosen object store.
+
+For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html).
+
+#### Streams based on Custom Receivers
+{:.no_toc}
+
+DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
   Guide](streaming-custom-receivers.html) for more details.
 
-- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
+#### Queue of RDDs as a Stream
+{:.no_toc}
+
+For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
 
 For more details on streams from sockets and files, see the API documentations of the relevant functions in
 [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org