You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by freeman-lab <gi...@git.apache.org> on 2014/12/25 20:46:18 UTC

[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

GitHub user freeman-lab opened a pull request:

    https://github.com/apache/spark/pull/3803

    [SPARK-4969] [STREAMING] [PYTHON] Add binaryRecords to streaming

    In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data.
    
    Summary of additions
    - adding `binaryRecordsStream` to Spark Streaming 
    - exposing `binaryRecordsStream` in the new PySpark Streaming
    - new unit tests in Scala and Python
    
    This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward.
    
    @tdas @davies

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/freeman-lab/spark streaming-binary-records

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3803.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3803
    
----
commit 8550c2619aba22b40dc109171b395522ccfaaf08
Author: freeman <th...@gmail.com>
Date:   2014-12-25T09:31:31Z

    Expose additional argument combination

commit ecef0eb8d4bf30627e5b35c40c2f4204e1670390
Author: freeman <th...@gmail.com>
Date:   2014-12-25T09:34:49Z

    Add binaryRecordsStream to python

commit fe4e803f8810c19aac02e7c8927af1d08b2f0a94
Author: freeman <th...@gmail.com>
Date:   2014-12-25T09:35:12Z

    Add binaryRecordStream to Java API

commit 36cb0fd576abb20b9c3210774ec9ff0471e2cf48
Author: freeman <th...@gmail.com>
Date:   2014-12-25T09:35:41Z

    Add binaryRecordsStream to scala

commit 23dd69f318aedbf12cab10380a50d94ce8c3ca92
Author: freeman <th...@gmail.com>
Date:   2014-12-25T09:35:52Z

    Tests for binaryRecordsStream

commit 9398bcb615c6cbf033b796c0837c99aba83303b4
Author: freeman <th...@gmail.com>
Date:   2014-12-25T09:40:06Z

    Expose optional hadoop configuration

commit 28bff9bab7be7c2f614a011f6b68e2103234c1df
Author: freeman <th...@gmail.com>
Date:   2014-12-25T10:02:42Z

    Fix missing arg

commit 8b70fbcf785074c7cde873cf10e8d5f0ea9e3979
Author: freeman <th...@gmail.com>
Date:   2014-12-25T10:03:01Z

    Reorganization

commit 2843e9de60f23bbce3ac185c09b8575a7513fe0d
Author: freeman <th...@gmail.com>
Date:   2014-12-25T17:43:20Z

    Add params to docstring

commit 94d90d0fbc576c4e475bb0a053e6c35d53152cf4
Author: freeman <th...@gmail.com>
Date:   2014-12-25T17:44:09Z

    Spelling

commit 1c739aa67a006a62a6ee8f294ff60568f9031476
Author: freeman <th...@gmail.com>
Date:   2014-12-25T17:48:04Z

    Simpler default arg handling

commit 029d49c143c7bed603db3ca43b44d212de516df8
Author: freeman <th...@gmail.com>
Date:   2014-12-25T17:50:42Z

    Formatting

commit a4324a38f8155f6b3e776326925af61f16a2fdfb
Author: freeman <th...@gmail.com>
Date:   2014-12-25T17:56:45Z

    Line length

commit d3e75b2bad2ba5048b36300cfd61b7cb5c39414b
Author: freeman <th...@gmail.com>
Date:   2014-12-25T19:29:06Z

    Add tests in python

commit becb34474fd165ee8aae9d207532869bce3ef743
Author: freeman <th...@gmail.com>
Date:   2014-12-25T19:31:07Z

    Formatting

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72785812
  
    LGTM. Just two minor nits. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72750244
  
      [Test build #26680 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26680/consoleFull) for   PR 3803 at commit [`eba925c`](https://github.com/apache/spark/commit/eba925c125341a1ed694c5f3ac3ab3914488f880).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72800036
  
    Thanks Jeremy! Good clean patch!
    On Feb 3, 2015 10:45 PM, "UCB AMPLab" <no...@github.com> wrote:
    
    > Test PASSed.
    > Refer to this link for build results (access rights to CI server needed):
    > https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26725/
    > Test PASSed.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/3803#issuecomment-72799040>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68111519
  
      [Test build #24822 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24822/consoleFull) for   PR 3803 at commit [`becb344`](https://github.com/apache/spark/commit/becb34474fd165ee8aae9d207532869bce3ef743).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22287997
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -373,6 +393,25 @@ class StreamingContext private[streaming] (
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files, assuming a fixed length per record,
    +   * generating one byte array per record. Files must be written to the monitored directory
    +   * by "moving" them from another location within the same file system. File names
    +   * starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param recordLength length of each record in bytes
    +   */
    +  def binaryRecordsStream(
    +      directory: String,
    +      recordLength: Int): DStream[Array[Byte]] = {
    +    val conf = sc_.hadoopConfiguration
    +    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    +    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    Maybe it's not an issue since we're using FixedLengthBinaryInputFormat, but even if it isn't we should have a comment explaining why it's correct or a defensive check that `getBytes` returns an array of the expected length.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72799033
  
      [Test build #26725 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26725/consoleFull) for   PR 3803 at commit [`b676534`](https://github.com/apache/spark/commit/b676534067a626260b6921ba17a04b6e03ff587a).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r24061094
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -210,6 +211,20 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
       }
     
       /**
    +   * :: Experimental ::
    +   *
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files with fixed record lengths,
    +   * yielding byte arrays
    +   * @param directory HDFS directory to monitor for new files
    +   * @param recordLength The length at which to split the records
    +   */
    --- End diff --
    
    Shouldnt this have the note as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22495402
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -233,6 +236,47 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
       }
     
    +  def testBinaryRecordsStream() {
    +    var ssc: StreamingContext = null
    +    val testDir: File = null
    +    try {
    +      val testDir = Utils.createTempDir()
    +
    +      Thread.sleep(1000)
    +      // Set up the streaming context and input streams
    +      val newConf = conf.clone.set(
    +        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
    --- End diff --
    
    Ok great, I'll wait for your PR to be merged and then refactor this test accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68797365
  
    Thanks for the review! I'll wait for @JoshRosen 's PR to merge and then update the test here. And will wait for your thoughts on the `getBytes` issue. Otherwise, I think everything's addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22776172
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -373,6 +393,25 @@ class StreamingContext private[streaming] (
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files, assuming a fixed length per record,
    +   * generating one byte array per record. Files must be written to the monitored directory
    +   * by "moving" them from another location within the same file system. File names
    +   * starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param recordLength length of each record in bytes
    +   */
    +  def binaryRecordsStream(
    +      directory: String,
    +      recordLength: Int): DStream[Array[Byte]] = {
    +    val conf = sc_.hadoopConfiguration
    +    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    +    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    Probably no need to make a copy if this is actually safe (it would just harm performance), so I think it would be fine to add a comment.  I suppose we could also do an assert to check that the array isn't padded, since that's unlikely to be expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22496437
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -373,6 +393,25 @@ class StreamingContext private[streaming] (
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files, assuming a fixed length per record,
    +   * generating one byte array per record. Files must be written to the monitored directory
    +   * by "moving" them from another location within the same file system. File names
    +   * starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param recordLength length of each record in bytes
    +   */
    +  def binaryRecordsStream(
    +      directory: String,
    +      recordLength: Int): DStream[Array[Byte]] = {
    +    val conf = sc_.hadoopConfiguration
    +    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    +    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    Thanks for flagging this, I wasn't aware of that behavior of ``getBytes``. I think that, as you suggest, both here and in ``binaryRecords()`` it's not a problem in practice. The BytesWritable that comes from the FixedLengthBinaryInputFormat will always be backed by a Byte array that's of the fixed length. For consistency and good practice I'm happy to make the change from #2712 both here and in the other method. Or we just add a comment. Let me know which you'd prefer. One concern might be performance effects, as mentioned in the other PR? @sryza might have thoughts. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72302378
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26449/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22776187
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -233,6 +236,47 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
       }
     
    +  def testBinaryRecordsStream() {
    +    var ssc: StreamingContext = null
    +    val testDir: File = null
    +    try {
    +      val testDir = Utils.createTempDir()
    +
    +      Thread.sleep(1000)
    +      // Set up the streaming context and input streams
    +      val newConf = conf.clone.set(
    +        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
    --- End diff --
    
    My PR has been merged, so I think this should be unblocked now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72302376
  
      [Test build #26449 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26449/consoleFull) for   PR 3803 at commit [`14bca9a`](https://github.com/apache/spark/commit/14bca9aff82b7323a624c4e05a46b60dd140ec99).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23981497
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -361,6 +363,25 @@ class StreamingContext private[streaming] (
     
       /**
        * Create a input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them using the given key-value types and input format.
    +   * Files must be written to the monitored directory by "moving" them from another
    +   * location within the same file system. File names starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param conf Hadoop configuration
    +   * @tparam K Key type for reading HDFS file
    +   * @tparam V Value type for reading HDFS file
    +   * @tparam F Input format for reading HDFS file
    +   */
    +  def fileStream[
    --- End diff --
    
    Sorry, meant exposing them to `binaryRecordsStream`. That's calling `fileStream`, and to include `conf` as an arg we'll also need to either specify `filter` and `newFilesOnly` or pass defaults, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72612066
  
    The Python parts look good to me, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23881453
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -372,6 +392,33 @@ class StreamingContext private[streaming] (
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files, assuming a fixed length per record,
    +   * generating one byte array per record. Files must be written to the monitored directory
    +   * by "moving" them from another location within the same file system. File names
    +   * starting with . are ignored.
    +   *
    +   * '''Note:''' Normally getBytes returns an array padded with extra values,
    +   * but the FixedLengthBinaryInputFormat ensures that it will always be backed
    +   * by a byte array of the correct length (the recordLength)
    +   *
    +   * @param directory HDFS directory to monitor for new file
    +   * @param recordLength length of each record in bytes
    +   */
    +  def binaryRecordsStream(
    +      directory: String,
    +      recordLength: Int): DStream[Array[Byte]] = {
    +    val conf = sc_.hadoopConfiguration
    +    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    +    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf)
    +    val data = br.map{ case (k, v) =>
    +      val bytes = v.getBytes
    +      assert(bytes.length == recordLength, "Byte array does not have correct length")
    +      bytes}
    --- End diff --
    
    Same style nit applies in this file, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22287877
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -233,6 +236,47 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
       }
     
    +  def testBinaryRecordsStream() {
    +    var ssc: StreamingContext = null
    +    val testDir: File = null
    +    try {
    +      val testDir = Utils.createTempDir()
    +
    +      Thread.sleep(1000)
    +      // Set up the streaming context and input streams
    +      val newConf = conf.clone.set(
    +        "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
    --- End diff --
    
    It looks like this is based on the FileInputStream test, which is known to be flaky.  I have a PR open which rewrites that test to not depend on SystemClock / Thread.sleep(): #3801.  Therefore, if we want to have this style of test, then this PR should block until my PR is merged so that it can use the new test utilities that I added.
    
    Here's the relevant change from my PR: https://github.com/apache/spark/pull/3801/files#diff-4


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72394487
  
      [Test build #26476 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26476/consoleFull) for   PR 3803 at commit [`14bca9a`](https://github.com/apache/spark/commit/14bca9aff82b7323a624c4e05a46b60dd140ec99).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68111521
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24822/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72391820
  
      [Test build #26476 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26476/consoleFull) for   PR 3803 at commit [`14bca9a`](https://github.com/apache/spark/commit/14bca9aff82b7323a624c4e05a46b60dd140ec99).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r24063473
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
           classOf[LongWritable],
           classOf[BytesWritable],
           conf=conf)
    -    val data = br.map{ case (k, v) => v.getBytes}
    +    val data = br.map { case (k, v) =>
    +      val bytes = v.getBytes
    +      assert(bytes.length == recordLength, "Byte array does not have correct length")
    +      bytes
    --- End diff --
    
    Do you mean something more than these notes we're adding? I just clarified the notes a bit to make it obvious the check is on the byte array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23881377
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -671,7 +675,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
           classOf[LongWritable],
           classOf[BytesWritable],
           conf=conf)
    -    val data = br.map{ case (k, v) => v.getBytes}
    +    val data = br.map{ case (k, v) =>
    --- End diff --
    
    Super-minor style nit, but I think our house-style here is to have a space before the opening `{` and to place the closing `}` on its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68111695
  
      [Test build #24823 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24823/consoleFull) for   PR 3803 at commit [`fcb915c`](https://github.com/apache/spark/commit/fcb915c2fbba80b9d7b765425e203b0e3796c59d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23886182
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -657,6 +657,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
        *
        * Load data from a flat binary file, assuming the length of each record is constant.
        *
    +   * '''Note:''' Normally getBytes returns an array padded with extra values,
    --- End diff --
    
    Hm, it's explicitly part of `FixedLengthBinaryInputFormat` in so far as that method constructs a bytes writable backed by a byte array of the given fixed record length ([here](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala#L111-123)). Does that count? In either case, I can expand the comment to avoid confusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72292766
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26435/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23980541
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -361,6 +363,25 @@ class StreamingContext private[streaming] (
     
       /**
        * Create a input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them using the given key-value types and input format.
    +   * Files must be written to the monitored directory by "moving" them from another
    +   * location within the same file system. File names starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param conf Hadoop configuration
    +   * @tparam K Key type for reading HDFS file
    +   * @tparam V Value type for reading HDFS file
    +   * @tparam F Input format for reading HDFS file
    +   */
    +  def fileStream[
    --- End diff --
    
    Fine with me, though would require wiring the extra arguments (`filter` and `newFilesOnly`) through to all the binary record methods (because they'll also need the `conf`). Should we expose those arguments in `binaryRecords`? Or just use their defaults?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68753824
  
    The Python part look good to me, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72286100
  
      [Test build #26435 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26435/consoleFull) for   PR 3803 at commit [`9a3715a`](https://github.com/apache/spark/commit/9a3715a1e6a71040d234da52bf848b0bb109a591).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/3803


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r24061014
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
           classOf[LongWritable],
           classOf[BytesWritable],
           conf=conf)
    -    val data = br.map{ case (k, v) => v.getBytes}
    +    val data = br.map { case (k, v) =>
    +      val bytes = v.getBytes
    +      assert(bytes.length == recordLength, "Byte array does not have correct length")
    +      bytes
    --- End diff --
    
    nit: Is this something that the user should be made aware of in the docs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68803760
  
      [Test build #25066 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25066/consoleFull) for   PR 3803 at commit [`317b6d1`](https://github.com/apache/spark/commit/317b6d1dc45f0706987c3258beaa64be08df4b3c).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-71929328
  
    Great thanks @JoshRosen will finish this up ASAP!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68795630
  
      [Test build #25066 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25066/consoleFull) for   PR 3803 at commit [`317b6d1`](https://github.com/apache/spark/commit/317b6d1dc45f0706987c3258beaa64be08df4b3c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22287887
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -233,6 +236,47 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
       }
     
    +  def testBinaryRecordsStream() {
    --- End diff --
    
    Also, since this is only called from one place, I'd just inline this code in the `test("binary records stream")` function rather than defining a whole new function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68113298
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24823/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72799040
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26725/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72793369
  
      [Test build #26725 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26725/consoleFull) for   PR 3803 at commit [`b676534`](https://github.com/apache/spark/commit/b676534067a626260b6921ba17a04b6e03ff587a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72394489
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26476/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22287961
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -373,6 +393,25 @@ class StreamingContext private[streaming] (
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files, assuming a fixed length per record,
    +   * generating one byte array per record. Files must be written to the monitored directory
    +   * by "moving" them from another location within the same file system. File names
    +   * starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param recordLength length of each record in bytes
    +   */
    +  def binaryRecordsStream(
    +      directory: String,
    +      recordLength: Int): DStream[Array[Byte]] = {
    +    val conf = sc_.hadoopConfiguration
    +    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    +    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    Actually, it looks like the same bug is present in the new `binaryRecords()` method in Spark core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23881764
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -657,6 +657,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
        *
        * Load data from a flat binary file, assuming the length of each record is constant.
        *
    +   * '''Note:''' Normally getBytes returns an array padded with extra values,
    --- End diff --
    
    `getBytes` on what?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72285668
  
    @JoshRosen I finished the refactored tests and added better handling of the `getBytes` based on your suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72292757
  
      [Test build #26435 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26435/consoleFull) for   PR 3803 at commit [`9a3715a`](https://github.com/apache/spark/commit/9a3715a1e6a71040d234da52bf848b0bb109a591).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-71928829
  
    /bump, now that my test-refactoring PR has been merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r24064149
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
           classOf[LongWritable],
           classOf[BytesWritable],
           conf=conf)
    -    val data = br.map{ case (k, v) => v.getBytes}
    +    val data = br.map { case (k, v) =>
    +      val bytes = v.getBytes
    +      assert(bytes.length == recordLength, "Byte array does not have correct length")
    +      bytes
    --- End diff --
    
    I meant should the user be told that the system can throw error when the records are not of the expected size. I dont have any strong feeling on this, just wondering.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r24064747
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
           classOf[LongWritable],
           classOf[BytesWritable],
           conf=conf)
    -    val data = br.map{ case (k, v) => v.getBytes}
    +    val data = br.map { case (k, v) =>
    +      val bytes = v.getBytes
    +      assert(bytes.length == recordLength, "Byte array does not have correct length")
    +      bytes
    --- End diff --
    
    Gotcha, I think it's ok as is then. Given what `FixedLengthInputFormat` is doing, this is more a defensive assertion, it's not something the user should hit due to an inappropriate input.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23881511
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---
    @@ -237,7 +240,14 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
       /** Generate one RDD from an array of files */
       private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
         val fileRDDs = files.map(file =>{
    -      val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
    +      val rdd = conf match {
    +        case Some(config) => context.sparkContext.newAPIHadoopFile(file,
    --- End diff --
    
    Another minor style thing, but I'd put the `file` on its own line, like the other arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23881983
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -657,6 +657,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
        *
        * Load data from a flat binary file, assuming the length of each record is constant.
        *
    +   * '''Note:''' Normally getBytes returns an array padded with extra values,
    --- End diff --
    
    This is referring to the `BytesWritable` values: their `getBytes` methods are technically allowed to return padded byte arrays, so in general callers should take this into account, but in this case we are guaranteeing that `BytesWritable.getBytes` will return a non-padded array.
    
    Actually, this raises a good point: the fact that we're returning un-padded arrays is an implementation detail, so maybe we don't want to make it part of our API contract (unless it's explicitly part of the FixedLengthBinaryInputFormat contract).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22287941
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -373,6 +393,25 @@ class StreamingContext private[streaming] (
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files, assuming a fixed length per record,
    +   * generating one byte array per record. Files must be written to the monitored directory
    +   * by "moving" them from another location within the same file system. File names
    +   * starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param recordLength length of each record in bytes
    +   */
    +  def binaryRecordsStream(
    +      directory: String,
    +      recordLength: Int): DStream[Array[Byte]] = {
    +    val conf = sc_.hadoopConfiguration
    +    conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
    +    val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](directory, conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    This is an subtly-incorrect usage of `getBytes`, since `getBytes` returns a padded byte array; you need to copy / slice out the subarray with the data using `v.getLength`.  see [HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to programming mistakes"](https://issues.apache.org/jira/browse/HADOOP-6298) for more details.
    
    We've hit this problem before in other parts of Spark:
    
    - https://issues.apache.org/jira/browse/SPARK-3121
    - https://issues.apache.org/jira/browse/SPARK-4901
    
    Here's a PR which shows the correct usage: #2712


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68111500
  
      [Test build #24822 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24822/consoleFull) for   PR 3803 at commit [`becb344`](https://github.com/apache/spark/commit/becb34474fd165ee8aae9d207532869bce3ef743).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72391803
  
    Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23966407
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -361,6 +363,25 @@ class StreamingContext private[streaming] (
     
       /**
        * Create a input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them using the given key-value types and input format.
    +   * Files must be written to the monitored directory by "moving" them from another
    +   * location within the same file system. File names starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param conf Hadoop configuration
    +   * @tparam K Key type for reading HDFS file
    +   * @tparam V Value type for reading HDFS file
    +   * @tparam F Input format for reading HDFS file
    +   */
    +  def fileStream[
    --- End diff --
    
    Can you make these parameters a superset of the parameters in the other fileStream? Otherwise it seems like people can either use a conf or use a filter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72761356
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26680/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23966451
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -210,6 +211,18 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
       }
     
       /**
    --- End diff --
    
    Please expose the new version of filestream in javaStreamingContext.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23980723
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---
    @@ -361,6 +363,25 @@ class StreamingContext private[streaming] (
     
       /**
        * Create a input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them using the given key-value types and input format.
    +   * Files must be written to the monitored directory by "moving" them from another
    +   * location within the same file system. File names starting with . are ignored.
    +   * @param directory HDFS directory to monitor for new file
    +   * @param conf Hadoop configuration
    +   * @tparam K Key type for reading HDFS file
    +   * @tparam V Value type for reading HDFS file
    +   * @tparam F Input format for reading HDFS file
    +   */
    +  def fileStream[
    --- End diff --
    
    Why the wiring to binaryRecords? Those two parameters are only relevant to
    DStream, so nothing to pass on down to the RDDs
    On Feb 2, 2015 7:30 PM, "Jeremy Freeman" <no...@github.com> wrote:
    
    > In
    > streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
    > <https://github.com/apache/spark/pull/3803#discussion_r23980541>:
    >
    > > @@ -361,6 +363,25 @@ class StreamingContext private[streaming] (
    > >
    > >    /**
    > >     * Create a input stream that monitors a Hadoop-compatible filesystem
    > > +   * for new files and reads them using the given key-value types and input format.
    > > +   * Files must be written to the monitored directory by "moving" them from another
    > > +   * location within the same file system. File names starting with . are ignored.
    > > +   * @param directory HDFS directory to monitor for new file
    > > +   * @param conf Hadoop configuration
    > > +   * @tparam K Key type for reading HDFS file
    > > +   * @tparam V Value type for reading HDFS file
    > > +   * @tparam F Input format for reading HDFS file
    > > +   */
    > > +  def fileStream[
    >
    > Fine with me, though would require wiring the extra arguments (filter and
    > newFilesOnly) through to all the binary record methods (because they'll
    > also need the conf). Should we expose those arguments in binaryRecords?
    > Or just use their defaults?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/3803/files#r23980541>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r22495425
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
    @@ -233,6 +236,47 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         }
       }
     
    +  def testBinaryRecordsStream() {
    --- End diff --
    
    Good catch, fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72300666
  
      [Test build #26449 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26449/consoleFull) for   PR 3803 at commit [`14bca9a`](https://github.com/apache/spark/commit/14bca9aff82b7323a624c4e05a46b60dd140ec99).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23966691
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -657,6 +657,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
        *
        * Load data from a flat binary file, assuming the length of each record is constant.
        *
    +   * '''Note:''' Normally getBytes returns an array padded with extra values,
    --- End diff --
    
    Does the end user ever need to call getBytes() given that this RDD is of type Array[Byte]? Does the user even need to know that this feature uses FixedLengthBinaryInputFormat underneath. And if they dont, then this note can be simplified and mentioned in terms of the RDD, something like "Each record in this DStream/RDD is of fixed length with padded bytes / right length / etc."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r23881470
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -210,6 +210,18 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
       }
     
       /**
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files with fixed record lengths,
    +   * yielding byte arrays
    +   * @param directory HDFS directory to monitor for new files
    +   * @param recordLength The length at which to split the records
    +   */
    +
    --- End diff --
    
    This looks like an extra line of blank whitespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68803767
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25066/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72761349
  
      [Test build #26680 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26680/consoleFull) for   PR 3803 at commit [`eba925c`](https://github.com/apache/spark/commit/eba925c125341a1ed694c5f3ac3ab3914488f880).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3803#discussion_r24063184
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---
    @@ -210,6 +211,20 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
       }
     
       /**
    +   * :: Experimental ::
    +   *
    +   * Create an input stream that monitors a Hadoop-compatible filesystem
    +   * for new files and reads them as flat binary files with fixed record lengths,
    +   * yielding byte arrays
    +   * @param directory HDFS directory to monitor for new files
    +   * @param recordLength The length at which to split the records
    +   */
    --- End diff --
    
    Thanks, added!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72793042
  
    Thanks for the detailed look @tdas! Think I addressed both nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969] [STREAMING] [PYTHON] Add binaryRe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-68113297
  
      [Test build #24823 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24823/consoleFull) for   PR 3803 at commit [`fcb915c`](https://github.com/apache/spark/commit/fcb915c2fbba80b9d7b765425e203b0e3796c59d).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class FileInputDStream[K, V, F <: NewInputFormat[K,V]](`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4969][STREAMING][PYTHON] Add binaryReco...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3803#issuecomment-72291526
  
    @tdas pointed out that these new methods should be marked as `@Experimental`, since `binaryRecords` is experimental.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org