You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by djalova <gi...@git.apache.org> on 2015/10/21 00:14:43 UTC

[GitHub] spark pull request: [SPARK-6328] [pyspark]

GitHub user djalova opened a pull request:

    https://github.com/apache/spark/pull/9186

    [SPARK-6328] [pyspark]

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/djalova/spark SPARK-6328

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9186
    
----
commit a63ef33188c14796dc0ca662448443ca2f8bb423
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-06T00:12:31Z

    Added skeleton classes for listener.py

commit 5b3da813b5fb4d71de335db8de756a0f52f3bcf7
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-12T17:35:18Z

    Merge remote-tracking branch 'upstream/master' into SPARK-6328

commit f6c91e9b06e238b3d8e8671705124e1f42a44b73
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-12T17:37:16Z

    Added skeleton class for Python API streamingListener

commit 1e6a87e6adf940e72c0e9048130b349067c16b73
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-13T17:07:48Z

    Added listener.py to __init__.py

commit 2579f644bafc231a50f14ab814b14e2253bc991a
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-13T23:15:20Z

    Working implementation of StreamingListener for Python API. Need to implement StreamingListenerEvent

commit afad086208676578d586c2a4b8450cfb17d0eec2
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-14T19:19:37Z

    Modified signature for addStreamingListener to match Scala API, added comment.

commit 3523e1fd406c8f8ef0c65ef044ad5beaa14cf2e2
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-14T21:29:34Z

    Merge remote-tracking branch 'upstream/master' into SPARK-6328

commit 7d50848f19a2f000f5fb7c6f63ece799d39ddada
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-16T19:06:27Z

    Refactored StreamingListener methods and added getEventInfo.

commit 9d66e81211bd600bc5ff8940a1cb9abb3c624b16
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-20T19:39:16Z

    Added test_batch_info_reports() to streaming/tests.py

commit aa87c40aedb3cbaf3b3dce93ddd224dd01a1dd7c
Author: Daniel Jalova <dj...@us.ibm.com>
Date:   2015-10-20T21:31:15Z

    Fixed merge conflicts

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155710043
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45611/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150713342
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151298325
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156595996
  
    **[Test build #45906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45906/consoleFull)** for PR 9186 at commit [`c941c3e`](https://github.com/apache/spark/commit/c941c3e5e457c8fe3fbf546c5e09aa954075be19).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r42782220
  
    --- Diff: python/pyspark/streaming/listener.py ---
    @@ -0,0 +1,75 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +__all__ = ["StreamingListener"]
    +
    +
    +class StreamingListener(object):
    +
    +    def __init__(self):
    +        pass
    +
    +    # Called when a receiver has been started.
    +    def onReceiverStarted(self, receiverStarted):
    +        pass
    +
    +    # Called when a receiver has reported an error.
    +    def onReceiverError(self, receiverError):
    +        pass
    +
    +    # Called when a receiver has been stopped.
    +    def onReceiverStopped(self, receiverStopped):
    +        pass
    +
    +    # Called when a batch of jobs has been submitted for processing.
    +    def onBatchSubmitted(self, batchSubmitted):
    +        pass
    +
    +    # Called when processing of a batch of jobs has started.
    +    def onBatchStarted(self, batchStarted):
    +        pass
    +
    +    # Called when processing of a batch of jobs has completed.
    +    def onBatchCompleted(self, batchCompleted):
    +        pass
    +
    +    # Called when processing of a job of a batch has started.
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        pass
    +
    +    # Called when processing of a job of a batch has completed
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        pass
    +
    +    def getEventInfo(self, event):
    --- End diff --
    
    Would it be better to use this method to return instances of the Python friendly classes for these Scala objects?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151291830
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156515644
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45874/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150075763
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150665420
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44248/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156523410
  
    Sure, I'll change it to check for at least 1 batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151626049
  
    **[Test build #44459 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44459/consoleFull)** for PR 9186 at commit [`47c12ed`](https://github.com/apache/spark/commit/47c12ed2d1b5e1f66d5a3df1999074e8eb1054db).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListenerEvent(object):`\n  * `class StreamingListenerBatchSubmitted(StreamingListenerEvent):`\n  * `class StreamingListenerBatchCompleted(StreamingListenerEvent):`\n  * `class StreamingListenerBatchStarted(StreamingListenerEvent):`\n  * `class StreamingListenerOutputOperationStarted(StreamingListenerEvent):`\n  * `class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):`\n  * `class StreamingListenerReceieverStarted(StreamingListenerEvent):`\n  * `class StreamingListenerReceiverError(StreamingListenerEvent):`\n  * `class StreamingListenerReceiverStopped(StreamingListenerEvent):`\n  * `class StreamingListener(object):`\n  * `class StreamingListenerAdapter(StreamingListener):`\n  * `class BatchInfo(object):`\n  * `class OutputOperationInfo(object):`\n  * `class ReceiverInfo(object):`\n  * `class StreamInputInfo(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149749028
  
    test this again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151473247
  
    I'm thinking maybe we should add a Java API for StreamingListener at first, it would make the PR simpler since most of Java collections can be mapped to Python classes directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151298326
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44377/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156361543
  
    #9669 has been merged. please merge with master and test the PR again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155966668
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156588293
  
    @zsxwing The test right now doesn't even return any streamInputInfos or outputOperationInfos. In the Scala tests for the StreamingListener, a custom Receiver was used for that, which is not available for the Python API. That's why I only included the BatchInfos in the asserts. I manually tested my code using a socketTextStream() to make sure that the other infos worked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155709852
  
    **[Test build #45611 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45611/consoleFull)** for PR 9186 at commit [`5415389`](https://github.com/apache/spark/commit/5415389e95dbe642dab8f00acb2c57a08d8b97b3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n  * `class PythonStreamingListenerWrapper(listener: PythonStreamingListener)`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [pyspark]

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149718574
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155593098
  
    I'm unsure what you mean. In the case that a user wants to override one of the StreamingListener methods in Python, don't we want to make sure that the python method properly overrides the java method, so that when the JavaStreamingListener is notified, the Python method is invoked instead of the Java method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156522465
  
    @djalova could you fix the test?
    ```
    [Running <class '__main__.StreamingListenerTests'>]
    test_batch_info_reports (__main__.StreamingListenerTests) ... FAIL
    
    ======================================================================
    FAIL: test_batch_info_reports (__main__.StreamingListenerTests)
    ----------------------------------------------------------------------
    Traceback (most recent call last):
      File "/home/jenkins/workspace/SparkPullRequestBuilder@2/python/pyspark/streaming/tests.py", line 444, in test_batch_info_reports
        self.assertEqual(len(batchInfosSubmitted), 4)
    AssertionError: 5 != 4
    
    ----------------------------------------------------------------------
    Ran 1 test in 4.768s
    
    FAILED (failures=1)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156515420
  
    **[Test build #45874 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45874/consoleFull)** for PR 9186 at commit [`7cdaf37`](https://github.com/apache/spark/commit/7cdaf37856a7cb279ef1dbb3f7a490971448ac7a).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156545862
  
    **[Test build #45880 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45880/consoleFull)** for PR 9186 at commit [`64192d6`](https://github.com/apache/spark/commit/64192d661d7b7c10c67043a3134e541c0328cedf).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `public class JavaGradientBoostingClassificationExample `\n  * `public class JavaGradientBoostingRegressionExample `\n  * `public class JavaRandomForestClassificationExample `\n  * `public class JavaRandomForestRegressionExample `\n  * `class StreamingListener(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156625409
  
    Thanks @djalova LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155625169
  
    I image adding these two classes:
    ```
    trait PythonStreamingListener {
     ...
    }
    
    class PythonStreamingListenerWrapper(listener: PythonStreamingListener) extends JavaStreamingListener {
     ...
    }
    ```
    and then call `_jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(self._jvm. PythonStreamingListenerWrapper(pythonStreamingListener)))` in the Python side.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r44626086
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -403,6 +404,80 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(batchStarted.batchInfo())
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(batchCompleted.batchInfo())
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        batchInfosSubmitted = batch_collector.batchInfosSubmitted
    +        batchInfosStarted = batch_collector.batchInfosStarted
    +        batchInfosCompleted = batch_collector.batchInfosCompleted
    +
    +        self.wait_for(batchInfosCompleted, 4)
    +
    +        self.assertEqual(len(batchInfosSubmitted), 4)
    +        for info in batchInfosSubmitted:
    +            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
    +            self.assertGreaterEqual(info.submissionTime(), 0)
    +            self.assertTrue(info.streamIdToInputInfo().isEmpty())
    +            self.assertFalse(info.outputOperationInfos().isEmpty())
    +            self.assertIsNotNone(info.outputOperationInfos().get(0))
    +            self.assertEqual(info.schedulingDelay(), -1)
    +            self.assertEqual(info.processingDelay(), -1)
    +            self.assertEqual(info.totalDelay(), -1)
    +            self.assertEqual(info.numRecords(), 0)
    +
    +        self.assertEqual(len(batchInfosStarted), 4)
    --- End diff --
    
    len(batchInfosSubmitted) may be greater than 4 since Streaming keeps launching batches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149732761
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150075600
  
    **[Test build #44115 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44115/consoleFull)** for PR 9186 at commit [`aa87c40`](https://github.com/apache/spark/commit/aa87c40aedb3cbaf3b3dce93ddd224dd01a1dd7c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149732702
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150717684
  
    **[Test build #44268 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44268/consoleFull)** for PR 9186 at commit [`233104d`](https://github.com/apache/spark/commit/233104d8cfc761eeaa9b3c21808f21209cbdac93).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n  * `class BatchInfo(object):`\n  * `class OutputOperationInfo(object):`\n  * `class ReceiverInfo(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155954931
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r44694463
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -403,6 +404,80 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(batchStarted.batchInfo())
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(batchCompleted.batchInfo())
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        batchInfosSubmitted = batch_collector.batchInfosSubmitted
    +        batchInfosStarted = batch_collector.batchInfosStarted
    +        batchInfosCompleted = batch_collector.batchInfosCompleted
    +
    +        self.wait_for(batchInfosCompleted, 4)
    +
    +        self.assertEqual(len(batchInfosSubmitted), 4)
    --- End diff --
    
    Ok, I'll change the assert to check if it receives at least 1 batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150717749
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156546057
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150713407
  
    The listener sometimes receives 3 batchCompleted events instead of 4 when I run the test. However, when I add a minor delay the test passes consistently. Can I get someone's opinion on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155648728
  
    I see, that worked. Should I keep the Python classes for the Infos or just have the user make Java method calls to get the values?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156593894
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45905/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r42777748
  
    --- Diff: python/pyspark/streaming/listener.py ---
    @@ -0,0 +1,75 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +__all__ = ["StreamingListener"]
    +
    +
    +class StreamingListener(object):
    +
    +    def __init__(self):
    +        pass
    +
    +    # Called when a receiver has been started.
    +    def onReceiverStarted(self, receiverStarted):
    +        pass
    +
    +    # Called when a receiver has reported an error.
    +    def onReceiverError(self, receiverError):
    +        pass
    +
    +    # Called when a receiver has been stopped.
    +    def onReceiverStopped(self, receiverStopped):
    +        pass
    +
    +    # Called when a batch of jobs has been submitted for processing.
    +    def onBatchSubmitted(self, batchSubmitted):
    +        pass
    +
    +    # Called when processing of a batch of jobs has started.
    +    def onBatchStarted(self, batchStarted):
    +        pass
    +
    +    # Called when processing of a batch of jobs has completed.
    +    def onBatchCompleted(self, batchCompleted):
    +        pass
    +
    +    # Called when processing of a job of a batch has started.
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        pass
    +
    +    # Called when processing of a job of a batch has completed
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        pass
    +
    +    def getEventInfo(self, event):
    --- End diff --
    
    I thought it would make it easier to see which StreamingListenerEvent is associated with each method but I can remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150665419
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151626307
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44459/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155681209
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45599/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155966552
  
    **[Test build #45690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45690/consoleFull)** for PR 9186 at commit [`7cdaf37`](https://github.com/apache/spark/commit/7cdaf37856a7cb279ef1dbb3f7a490971448ac7a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150663302
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-153162764
  
    @zsxwing Can you take a look at my changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155966669
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45690/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155954947
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r44626089
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -403,6 +404,80 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(batchStarted.batchInfo())
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(batchCompleted.batchInfo())
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        batchInfosSubmitted = batch_collector.batchInfosSubmitted
    +        batchInfosStarted = batch_collector.batchInfosStarted
    +        batchInfosCompleted = batch_collector.batchInfosCompleted
    +
    +        self.wait_for(batchInfosCompleted, 4)
    +
    +        self.assertEqual(len(batchInfosSubmitted), 4)
    +        for info in batchInfosSubmitted:
    +            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
    +            self.assertGreaterEqual(info.submissionTime(), 0)
    +            self.assertTrue(info.streamIdToInputInfo().isEmpty())
    +            self.assertFalse(info.outputOperationInfos().isEmpty())
    +            self.assertIsNotNone(info.outputOperationInfos().get(0))
    +            self.assertEqual(info.schedulingDelay(), -1)
    +            self.assertEqual(info.processingDelay(), -1)
    +            self.assertEqual(info.totalDelay(), -1)
    +            self.assertEqual(info.numRecords(), 0)
    +
    +        self.assertEqual(len(batchInfosStarted), 4)
    +        for info in batchInfosStarted:
    +            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
    +            self.assertGreaterEqual(info.submissionTime(), 0)
    +            self.assertTrue(info.streamIdToInputInfo().isEmpty())
    +            self.assertFalse(info.outputOperationInfos().isEmpty())
    +            self.assertIsNotNone(info.outputOperationInfos().get(0))
    +            self.assertGreaterEqual(info.schedulingDelay(), 0)
    +            self.assertEqual(info.processingDelay(), -1)
    +            self.assertEqual(info.totalDelay(), -1)
    +            self.assertEqual(info.numRecords(), 0)
    +
    +        self.assertEqual(len(batchInfosCompleted), 4)
    --- End diff --
    
    len(batchInfosSubmitted) may be greater than 4 since Streaming keeps launching batches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155572841
  
    @djalova  could you update this PR soon? This is very helpful for 1.6. If you don't have time now, I can take it over. The commit to master will still show up as yours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151293587
  
    **[Test build #44377 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44377/consoleFull)** for PR 9186 at commit [`79d70ab`](https://github.com/apache/spark/commit/79d70abe4d31f4b021ef9b35ed26464bed6fd16c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156591380
  
    Sorry I wasn't reading your comment carefully. I'll make the update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151616357
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155941553
  
    Yeah, that is hard. We can figure that out in a later PR. Can you atleast
    test that the contents of the maps in the BatchInfo can be accesses in
    python? Should be additional checks along with the scheduling delay, etc.
    
    On Wed, Nov 11, 2015 at 2:03 PM, Daniel Jalova <no...@github.com>
    wrote:
    
    > There also doesn't seem to be a way to add receivers using Python. I've
    > been testing the code on my own machine using socketTextStream() but I
    > don't think that's suitable to put in a test.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/9186#issuecomment-155923601>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156593862
  
    **[Test build #45905 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45905/consoleFull)** for PR 9186 at commit [`5349c82`](https://github.com/apache/spark/commit/5349c825de7ec1483b8d9c28480ec4071f75ea01).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155573413
  
    I'll work on it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156501246
  
    **[Test build #45874 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45874/consoleFull)** for PR 9186 at commit [`7cdaf37`](https://github.com/apache/spark/commit/7cdaf37856a7cb279ef1dbb3f7a490971448ac7a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155863868
  
    Could you add some codes in tests to demonstrate all fields in `JavaBatchInfo`, `JavaReceiverInfo` and `JavaOutputOperationInfo` can be accessed in Python? It's not necessary to check the contents in them (They are already verified in Java tests), just accessing these fields should be fine. Moreover, it would be better if you can access the contents of Map fields using Python dict syntax.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150125077
  
    I suggest adding some Python friendly classes for `BatchInfo`, `ReceiverInfo`, `OutputOperationInfo` and converting these Scala objects to them. Some Scala things, like `Option`, is hard to use in Python.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155606846
  
    > I'm unsure what you mean. In the case that a user wants to override one of the StreamingListener methods in Python, don't we want to make sure that the python method properly overrides the java method, so that when the JavaStreamingListener is notified, the Python method is invoked instead of the Java method?
    
    Could you create a PythonStreamingListener interface in Java side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149730804
  
    @zsxwing can you take a look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r43078607
  
    --- Diff: python/pyspark/streaming/listener.py ---
    @@ -0,0 +1,285 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +__all__ = ["StreamingListener"]
    +
    +
    +class StreamingListenerEvent(object):
    +
    +    def __init__(self):
    +        pass
    +
    +
    +class StreamingListenerBatchSubmitted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerBatchCompleted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerBatchStarted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerOutputOperationStarted(StreamingListenerEvent):
    +
    +    def __init__(self, outputOperationInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.outputOperationInfo = outputOperationInfo
    +
    +
    +class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):
    +
    +    def __init__(self, outputOperationInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.outputOperationInfo = outputOperationInfo
    +
    +
    +class StreamingListenerReceieverStarted(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListenerReceiverError(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListenerReceiverStopped(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListener(object):
    +
    +    def __init__(self):
    +        pass
    +
    +    def onReceiverStarted(self, receiverStarted):
    +        """
    +        Called when a receiver has been started
    +        """
    +        pass
    +
    +    def onReceiverError(self, receiverError):
    +        """
    +        Called when a receiver has reported an error
    +        """
    +        pass
    +
    +    def onReceiverStopped(self, receiverStopped):
    +        """
    +        Called when a receiver has been stopped
    +        """
    +        pass
    +
    +    def onBatchSubmitted(self, batchSubmitted):
    +        """
    +        Called when a batch of jobs has been submitted for processing.
    +        """
    +        pass
    +
    +    def onBatchStarted(self, batchStarted):
    +        """
    +        Called when processing of a batch of jobs has started.
    +        """
    +        pass
    +
    +    def onBatchCompleted(self, batchCompleted):
    +        """
    +        Called when processing of a batch of jobs has completed.
    +        """
    +        pass
    +
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        """
    +        Called when processing of a job of a batch has started.
    +        """
    +        pass
    +
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        """
    +        Called when processing of a job of a batch has completed
    +        """
    +        pass
    +
    +    class Java:
    +        implements = ["org.apache.spark.streaming.scheduler.StreamingListener"]
    +
    +
    +class StreamingListenerAdapter(StreamingListener):
    +
    +    def __init__(self, streamingListener):
    +        super(StreamingListener, self).__init__()
    +        self.userStreamingListener = streamingListener
    +
    +    def onReceiverStarted(self, receiverStarted):
    +        receiver_info = ReceiverInfo(receiverStarted.receiverInfo())
    +        receiver_started = StreamingListenerReceieverStarted(receiver_info)
    +        self.userStreamingListener.onReceiverStarted(receiver_started)
    +
    +    def onReceiverError(self, receiverError):
    +        receiver_info = ReceiverInfo(receiverError.receiverInfo())
    +        receiver_error = StreamingListenerReceiverError(receiver_info)
    +        self.userStreamingListener.onReceiverError(receiver_error)
    +
    +    def onReceiverStopped(self, receiverStopped):
    +        receiver_info = ReceiverInfo(receiverStopped.receiverInfo())
    +        receiver_stopped = StreamingListenerReceiverStopped(receiver_info)
    +        self.userStreamingListener.onReceiverStopped(receiver_stopped)
    +
    +    def onBatchSubmitted(self, batchSubmitted):
    +        batch_info = BatchInfo(batchSubmitted.batchInfo())
    +        batch_submitted = StreamingListenerBatchSubmitted(batch_info)
    +        self.userStreamingListener.onBatchSubmitted(batch_submitted)
    +
    +    def onBatchStarted(self, batchStarted):
    +        batch_info = BatchInfo(batchStarted.batchInfo())
    +        batch_started = StreamingListenerBatchStarted(batch_info)
    +        self.userStreamingListener .onBatchStarted(batch_started)
    +
    +    def onBatchCompleted(self, batchCompleted):
    +        batch_info = BatchInfo(batchCompleted.batchInfo())
    +        batch_completed = StreamingListenerBatchCompleted(batch_info)
    +        self.userStreamingListener.onBatchCompleted(batch_completed)
    +
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        output_op_info = OutputOperationInfo(outputOperationStarted.outputOperationInfo())
    +        output_operation_started = StreamingListenerOutputOperationStarted(output_op_info)
    +        self.userStreamingListener.onOutputOperationStarted(output_operation_started)
    +
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        output_op_info = OutputOperationInfo(outputOperationCompleted.outputOperationInfo())
    +        output_operation_completed = StreamingListenerOutputOperationCompleted(output_op_info)
    +        self.userStreamingListener.onOutputOperationCompleted(output_operation_completed)
    +
    +
    +class BatchInfo(object):
    +
    +    def __init__(self, javaBatchInfo):
    +
    +        self.processingStartTime = None
    +        self.processingEndTime = None
    +
    +        self.batchTime = javaBatchInfo.batchTime()
    +        self.streamIdToInputInfo = self._map2dict(javaBatchInfo.streamIdToInputInfo())
    +
    +        self.submissionTime = javaBatchInfo.submissionTime()
    +        if javaBatchInfo.processingStartTime().isEmpty() is False:
    +            self.processingStartTime = javaBatchInfo.processingStartTime().get()
    +        if javaBatchInfo.processingEndTime().isEmpty() is False:
    +            self.processingEndTime = javaBatchInfo.processingEndTime().get()
    +
    +        self.outputOperationInfos = self._map2dict(javaBatchInfo.outputOperationInfos())
    --- End diff --
    
    `OutputOperationInfo` has Scala `Option` fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150070364
  
    **[Test build #44115 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44115/consoleFull)** for PR 9186 at commit [`aa87c40`](https://github.com/apache/spark/commit/aa87c40aedb3cbaf3b3dce93ddd224dd01a1dd7c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150665416
  
    **[Test build #44248 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44248/consoleFull)** for PR 9186 at commit [`0ac3df6`](https://github.com/apache/spark/commit/0ac3df63fc00d0fc6fe99f284496b9ad136666ef).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n  * `class BatchInfo(object):`\n  * `class OutputOperationInfo(object):`\n  * `class ReceiverInfo(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150663329
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156593893
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151616399
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155686757
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r42926305
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -398,6 +399,69 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted))
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(self.getEventInfo(batchStarted))
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(self.getEventInfo(batchCompleted))
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        # Test occasionally fails without a delay
    +        time.sleep(.1)
    --- End diff --
    
    The listener sometimes receives 3 batchCompleted events instead of 4 when I run the test, even though it always gets the correct job output. However, when I add a minor delay the test passes consistently. Can I get someone's opinion on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156579740
  
    @djalova could you add the following checks?
    > I just meant you can access the Map here, e.g.:
    > 
    > for streamId in info.streamIdToInputInfo():
    >                  streamInputInfo = info.streamIdToInputInfo()[streamId]
    >                  # access fields of streamInputInfo
    > 
    > for outputOpId in info.outputOperationInfos():
    >                 outputOperationInfo =  info.outputOperationInfos()[outputOpId]
    >                 # access fields of outputOperationInfo
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r43078141
  
    --- Diff: python/pyspark/streaming/listener.py ---
    @@ -0,0 +1,285 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +__all__ = ["StreamingListener"]
    +
    +
    +class StreamingListenerEvent(object):
    +
    +    def __init__(self):
    +        pass
    +
    +
    +class StreamingListenerBatchSubmitted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerBatchCompleted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerBatchStarted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerOutputOperationStarted(StreamingListenerEvent):
    +
    +    def __init__(self, outputOperationInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.outputOperationInfo = outputOperationInfo
    +
    +
    +class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):
    +
    +    def __init__(self, outputOperationInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.outputOperationInfo = outputOperationInfo
    +
    +
    +class StreamingListenerReceieverStarted(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListenerReceiverError(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListenerReceiverStopped(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListener(object):
    +
    +    def __init__(self):
    +        pass
    +
    +    def onReceiverStarted(self, receiverStarted):
    +        """
    +        Called when a receiver has been started
    +        """
    +        pass
    +
    +    def onReceiverError(self, receiverError):
    +        """
    +        Called when a receiver has reported an error
    +        """
    +        pass
    +
    +    def onReceiverStopped(self, receiverStopped):
    +        """
    +        Called when a receiver has been stopped
    +        """
    +        pass
    +
    +    def onBatchSubmitted(self, batchSubmitted):
    +        """
    +        Called when a batch of jobs has been submitted for processing.
    +        """
    +        pass
    +
    +    def onBatchStarted(self, batchStarted):
    +        """
    +        Called when processing of a batch of jobs has started.
    +        """
    +        pass
    +
    +    def onBatchCompleted(self, batchCompleted):
    +        """
    +        Called when processing of a batch of jobs has completed.
    +        """
    +        pass
    +
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        """
    +        Called when processing of a job of a batch has started.
    +        """
    +        pass
    +
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        """
    +        Called when processing of a job of a batch has completed
    +        """
    +        pass
    +
    +    class Java:
    +        implements = ["org.apache.spark.streaming.scheduler.StreamingListener"]
    +
    +
    +class StreamingListenerAdapter(StreamingListener):
    +
    +    def __init__(self, streamingListener):
    +        super(StreamingListener, self).__init__()
    +        self.userStreamingListener = streamingListener
    +
    +    def onReceiverStarted(self, receiverStarted):
    +        receiver_info = ReceiverInfo(receiverStarted.receiverInfo())
    +        receiver_started = StreamingListenerReceieverStarted(receiver_info)
    +        self.userStreamingListener.onReceiverStarted(receiver_started)
    +
    +    def onReceiverError(self, receiverError):
    +        receiver_info = ReceiverInfo(receiverError.receiverInfo())
    +        receiver_error = StreamingListenerReceiverError(receiver_info)
    +        self.userStreamingListener.onReceiverError(receiver_error)
    +
    +    def onReceiverStopped(self, receiverStopped):
    +        receiver_info = ReceiverInfo(receiverStopped.receiverInfo())
    +        receiver_stopped = StreamingListenerReceiverStopped(receiver_info)
    +        self.userStreamingListener.onReceiverStopped(receiver_stopped)
    +
    +    def onBatchSubmitted(self, batchSubmitted):
    +        batch_info = BatchInfo(batchSubmitted.batchInfo())
    +        batch_submitted = StreamingListenerBatchSubmitted(batch_info)
    +        self.userStreamingListener.onBatchSubmitted(batch_submitted)
    +
    +    def onBatchStarted(self, batchStarted):
    +        batch_info = BatchInfo(batchStarted.batchInfo())
    +        batch_started = StreamingListenerBatchStarted(batch_info)
    +        self.userStreamingListener .onBatchStarted(batch_started)
    --- End diff --
    
    nit: redundant space 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149737738
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44014/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155673623
  
    **[Test build #45599 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45599/consoleFull)** for PR 9186 at commit [`9e4e04a`](https://github.com/apache/spark/commit/9e4e04a6c369da13c3cc8e798f5a5b0e210b24d5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155923601
  
    There also doesn't seem to be a way to add receivers using Python. I've been testing the code on my own machine using socketTextStream() but I don't think that's suitable to put in a test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150069424
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156602806
  
    **[Test build #45906 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45906/consoleFull)** for PR 9186 at commit [`c941c3e`](https://github.com/apache/spark/commit/c941c3e5e457c8fe3fbf546c5e09aa954075be19).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151298181
  
    **[Test build #44377 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44377/consoleFull)** for PR 9186 at commit [`79d70ab`](https://github.com/apache/spark/commit/79d70abe4d31f4b021ef9b35ed26464bed6fd16c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListenerEvent(object):`\n  * `class StreamingListenerBatchSubmitted(StreamingListenerEvent):`\n  * `class StreamingListenerBatchCompleted(StreamingListenerEvent):`\n  * `class StreamingListenerBatchStarted(StreamingListenerEvent):`\n  * `class StreamingListenerOutputOperationStarted(StreamingListenerEvent):`\n  * `class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):`\n  * `class StreamingListenerReceieverStarted(StreamingListenerEvent):`\n  * `class StreamingListenerReceiverError(StreamingListenerEvent):`\n  * `class StreamingListenerReceiverStopped(StreamingListenerEvent):`\n  * `class StreamingListener(object):`\n  * `class StreamingListenerAdapter(StreamingListener):`\n  * `class BatchInfo(object):`\n  * `class OutputOperationInfo(object):`\n  * `class ReceiverInfo(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155583518
  
    @zsxwing Since JavaStreamingListener is an actual class and not an interface in the jvm, I can't use "implements" in py4j. How should I work around this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149739513
  
    The test that failed was a flaky StreamingKMeansTest.
    Please retest. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156499802
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155586759
  
    > @zsxwing Since JavaStreamingListener is an actual class and not an interface, I can't use "implements" in py4j. How should I work around this? Should I change JavaStreamingListener from a class to a trait?
    
    `JavaStreamingListener` is a class to make sure we don't forget to add new APIs to it when adding anything to `StreamingListener`. Could you add a Python class and delegate all invokings to  `JavaStreamingListener`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r44626076
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -403,6 +404,80 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(batchStarted.batchInfo())
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(batchCompleted.batchInfo())
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        batchInfosSubmitted = batch_collector.batchInfosSubmitted
    +        batchInfosStarted = batch_collector.batchInfosStarted
    +        batchInfosCompleted = batch_collector.batchInfosCompleted
    +
    +        self.wait_for(batchInfosCompleted, 4)
    +
    +        self.assertEqual(len(batchInfosSubmitted), 4)
    --- End diff --
    
    len(batchInfosSubmitted) may be greater than 4 since Streaming keeps launching batches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151619029
  
    **[Test build #44459 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44459/consoleFull)** for PR 9186 at commit [`47c12ed`](https://github.com/apache/spark/commit/47c12ed2d1b5e1f66d5a3df1999074e8eb1054db).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150069007
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150713359
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149991651
  
    @tdas it looks like Jenkins didn't retest


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r44626024
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -403,6 +404,80 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(batchStarted.batchInfo())
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(batchCompleted.batchInfo())
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        batchInfosSubmitted = batch_collector.batchInfosSubmitted
    +        batchInfosStarted = batch_collector.batchInfosStarted
    +        batchInfosCompleted = batch_collector.batchInfosCompleted
    +
    +        self.wait_for(batchInfosCompleted, 4)
    +
    +        self.assertEqual(len(batchInfosSubmitted), 4)
    +        for info in batchInfosSubmitted:
    +            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
    +            self.assertGreaterEqual(info.submissionTime(), 0)
    +            self.assertTrue(info.streamIdToInputInfo().isEmpty())
    +            self.assertFalse(info.outputOperationInfos().isEmpty())
    +            self.assertIsNotNone(info.outputOperationInfos().get(0))
    +            self.assertEqual(info.schedulingDelay(), -1)
    +            self.assertEqual(info.processingDelay(), -1)
    +            self.assertEqual(info.totalDelay(), -1)
    +            self.assertEqual(info.numRecords(), 0)
    --- End diff --
    
    I just meant you can access the Map here, e.g.:
    ```
    for streamId in info.streamIdToInputInfo():
                     streamInputInfo = info.streamIdToInputInfo()[streamId]
                     # access fields of streamInputInfo
    
    for outputOpId in info.outputOperationInfos():
                    outputOperationInfo =  info.outputOperationInfos()[outputOpId]
                    # access fields of outputOperationInfo
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156585296
  
    Sorry for my unclear comment. I just meant adding the real codes to access fields of `streamInputInfo` and `outputOperationInfo`, such as
    ```
    streamInputInfo.inputStreamId()
    streamInputInfo.numRecords()
    for key in streamInputInfo.metadata():
        streamInputInfo.metadata()[key]
    metadataDescription.metadataDescription()
    ...
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156546059
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45880/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150717751
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44268/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156198315
  
    We should retest this PR after merging #9669.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156586652
  
    **[Test build #45905 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45905/consoleFull)** for PR 9186 at commit [`5349c82`](https://github.com/apache/spark/commit/5349c825de7ec1483b8d9c28480ec4071f75ea01).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156602905
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45906/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155710041
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156017667
  
    I just noticed that Streaming Python unit tests cannot report failure. It always says `pass` even if some test fails. Investigating it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155673160
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151291848
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155908396
  
    > Moreover, it would be better if you can access the contents of Map fields using Python dict syntax.
    
    Do you have any suggestions for doing this? I was using wrapper classes for the XXXInfos to do this but I'm not sure if there's a clean way to do it without it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151615522
  
    I added in your suggestions. Let me know if we should still implement a Java API for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156532648
  
    **[Test build #45880 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45880/consoleFull)** for PR 9186 at commit [`64192d6`](https://github.com/apache/spark/commit/64192d661d7b7c10c67043a3134e541c0328cedf).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155955139
  
    **[Test build #45690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45690/consoleFull)** for PR 9186 at commit [`7cdaf37`](https://github.com/apache/spark/commit/7cdaf37856a7cb279ef1dbb3f7a490971448ac7a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150665299
  
    **[Test build #44248 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44248/consoleFull)** for PR 9186 at commit [`0ac3df6`](https://github.com/apache/spark/commit/0ac3df63fc00d0fc6fe99f284496b9ad136666ef).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r42715990
  
    --- Diff: python/pyspark/streaming/listener.py ---
    @@ -0,0 +1,75 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +__all__ = ["StreamingListener"]
    +
    +
    +class StreamingListener(object):
    +
    +    def __init__(self):
    +        pass
    +
    +    # Called when a receiver has been started.
    +    def onReceiverStarted(self, receiverStarted):
    +        pass
    +
    +    # Called when a receiver has reported an error.
    +    def onReceiverError(self, receiverError):
    +        pass
    +
    +    # Called when a receiver has been stopped.
    +    def onReceiverStopped(self, receiverStopped):
    +        pass
    +
    +    # Called when a batch of jobs has been submitted for processing.
    +    def onBatchSubmitted(self, batchSubmitted):
    +        pass
    +
    +    # Called when processing of a batch of jobs has started.
    +    def onBatchStarted(self, batchStarted):
    +        pass
    +
    +    # Called when processing of a batch of jobs has completed.
    +    def onBatchCompleted(self, batchCompleted):
    +        pass
    +
    +    # Called when processing of a job of a batch has started.
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        pass
    +
    +    # Called when processing of a job of a batch has completed
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        pass
    +
    +    def getEventInfo(self, event):
    --- End diff --
    
    Is this method necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-151626302
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r43078501
  
    --- Diff: python/pyspark/streaming/listener.py ---
    @@ -0,0 +1,285 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +__all__ = ["StreamingListener"]
    +
    +
    +class StreamingListenerEvent(object):
    +
    +    def __init__(self):
    +        pass
    +
    +
    +class StreamingListenerBatchSubmitted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerBatchCompleted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerBatchStarted(StreamingListenerEvent):
    +
    +    def __init__(self, batchInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.batchInfo = batchInfo
    +
    +
    +class StreamingListenerOutputOperationStarted(StreamingListenerEvent):
    +
    +    def __init__(self, outputOperationInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.outputOperationInfo = outputOperationInfo
    +
    +
    +class StreamingListenerOutputOperationCompleted(StreamingListenerEvent):
    +
    +    def __init__(self, outputOperationInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.outputOperationInfo = outputOperationInfo
    +
    +
    +class StreamingListenerReceieverStarted(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListenerReceiverError(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListenerReceiverStopped(StreamingListenerEvent):
    +
    +    def __init__(self, receiverInfo):
    +        super(StreamingListenerEvent, self).__init__()
    +        self.receiverInfo = receiverInfo
    +
    +
    +class StreamingListener(object):
    +
    +    def __init__(self):
    +        pass
    +
    +    def onReceiverStarted(self, receiverStarted):
    +        """
    +        Called when a receiver has been started
    +        """
    +        pass
    +
    +    def onReceiverError(self, receiverError):
    +        """
    +        Called when a receiver has reported an error
    +        """
    +        pass
    +
    +    def onReceiverStopped(self, receiverStopped):
    +        """
    +        Called when a receiver has been stopped
    +        """
    +        pass
    +
    +    def onBatchSubmitted(self, batchSubmitted):
    +        """
    +        Called when a batch of jobs has been submitted for processing.
    +        """
    +        pass
    +
    +    def onBatchStarted(self, batchStarted):
    +        """
    +        Called when processing of a batch of jobs has started.
    +        """
    +        pass
    +
    +    def onBatchCompleted(self, batchCompleted):
    +        """
    +        Called when processing of a batch of jobs has completed.
    +        """
    +        pass
    +
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        """
    +        Called when processing of a job of a batch has started.
    +        """
    +        pass
    +
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        """
    +        Called when processing of a job of a batch has completed
    +        """
    +        pass
    +
    +    class Java:
    +        implements = ["org.apache.spark.streaming.scheduler.StreamingListener"]
    +
    +
    +class StreamingListenerAdapter(StreamingListener):
    +
    +    def __init__(self, streamingListener):
    +        super(StreamingListener, self).__init__()
    +        self.userStreamingListener = streamingListener
    +
    +    def onReceiverStarted(self, receiverStarted):
    +        receiver_info = ReceiverInfo(receiverStarted.receiverInfo())
    +        receiver_started = StreamingListenerReceieverStarted(receiver_info)
    +        self.userStreamingListener.onReceiverStarted(receiver_started)
    +
    +    def onReceiverError(self, receiverError):
    +        receiver_info = ReceiverInfo(receiverError.receiverInfo())
    +        receiver_error = StreamingListenerReceiverError(receiver_info)
    +        self.userStreamingListener.onReceiverError(receiver_error)
    +
    +    def onReceiverStopped(self, receiverStopped):
    +        receiver_info = ReceiverInfo(receiverStopped.receiverInfo())
    +        receiver_stopped = StreamingListenerReceiverStopped(receiver_info)
    +        self.userStreamingListener.onReceiverStopped(receiver_stopped)
    +
    +    def onBatchSubmitted(self, batchSubmitted):
    +        batch_info = BatchInfo(batchSubmitted.batchInfo())
    +        batch_submitted = StreamingListenerBatchSubmitted(batch_info)
    +        self.userStreamingListener.onBatchSubmitted(batch_submitted)
    +
    +    def onBatchStarted(self, batchStarted):
    +        batch_info = BatchInfo(batchStarted.batchInfo())
    +        batch_started = StreamingListenerBatchStarted(batch_info)
    +        self.userStreamingListener .onBatchStarted(batch_started)
    +
    +    def onBatchCompleted(self, batchCompleted):
    +        batch_info = BatchInfo(batchCompleted.batchInfo())
    +        batch_completed = StreamingListenerBatchCompleted(batch_info)
    +        self.userStreamingListener.onBatchCompleted(batch_completed)
    +
    +    def onOutputOperationStarted(self, outputOperationStarted):
    +        output_op_info = OutputOperationInfo(outputOperationStarted.outputOperationInfo())
    +        output_operation_started = StreamingListenerOutputOperationStarted(output_op_info)
    +        self.userStreamingListener.onOutputOperationStarted(output_operation_started)
    +
    +    def onOutputOperationCompleted(self, outputOperationCompleted):
    +        output_op_info = OutputOperationInfo(outputOperationCompleted.outputOperationInfo())
    +        output_operation_completed = StreamingListenerOutputOperationCompleted(output_op_info)
    +        self.userStreamingListener.onOutputOperationCompleted(output_operation_completed)
    +
    +
    +class BatchInfo(object):
    +
    +    def __init__(self, javaBatchInfo):
    +
    +        self.processingStartTime = None
    +        self.processingEndTime = None
    +
    +        self.batchTime = javaBatchInfo.batchTime()
    +        self.streamIdToInputInfo = self._map2dict(javaBatchInfo.streamIdToInputInfo())
    --- End diff --
    
    `StreamInputInfo` has a Scala `Map` field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155681175
  
    **[Test build #45599 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45599/consoleFull)** for PR 9186 at commit [`9e4e04a`](https://github.com/apache/spark/commit/9e4e04a6c369da13c3cc8e798f5a5b0e210b24d5).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n  * `class PythonStreamingListenerWrapper(listener: PythonStreamingListener)`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155686739
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150069416
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155662537
  
    > I see, that worked. Should I keep the Python classes for the Infos or just have the user make Java method calls to get the values?
    
    Just let the user use Java XXXInfos directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156602904
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155889295
  
    Just a point, if you can add this very soon, then I can merge it for 1.6 release. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by djalova <gi...@git.apache.org>.
Github user djalova commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155615807
  
    > Could you create a PythonStreamingListener interface in Java side?
    I could do that but I would either have to have it extend StreamingListener which could be a bit messy or create a new addStreamingListener() in JavaStreamingContext that takes in a PythonStreamingListener.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149730735
  
    this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155690123
  
    **[Test build #45611 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45611/consoleFull)** for PR 9186 at commit [`5415389`](https://github.com/apache/spark/commit/5415389e95dbe642dab8f00acb2c57a08d8b97b3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-156515638
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149737737
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155681206
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149737692
  
    **[Test build #44014 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44014/consoleFull)** for PR 9186 at commit [`aa87c40`](https://github.com/apache/spark/commit/aa87c40aedb3cbaf3b3dce93ddd224dd01a1dd7c).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `class StreamingListener(object):`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155255131
  
    #9420 has been merged. could you please update your PR using the JavaStreamingListener?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-149734857
  
    **[Test build #44014 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44014/consoleFull)** for PR 9186 at commit [`aa87c40`](https://github.com/apache/spark/commit/aa87c40aedb3cbaf3b3dce93ddd224dd01a1dd7c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150714210
  
    **[Test build #44268 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44268/consoleFull)** for PR 9186 at commit [`233104d`](https://github.com/apache/spark/commit/233104d8cfc761eeaa9b3c21808f21209cbdac93).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r44563303
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala ---
    @@ -18,6 +18,82 @@
     package org.apache.spark.streaming.api.java
     
     import org.apache.spark.streaming.Time
    +import org.apache.spark.streaming.scheduler.StreamingListener
    +
    +private[streaming] trait PythonStreamingListener{
    +
    +  /** Called when a receiver has been started */
    +  def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
    +
    +  /** Called when a receiver has reported an error */
    +  def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }
    +
    +  /** Called when a receiver has been stopped */
    +  def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { }
    +
    +  /** Called when a batch of jobs has been submitted for processing. */
    +  def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }
    +
    +  /** Called when processing of a batch of jobs has started.  */
    +  def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }
    +
    +  /** Called when processing of a batch of jobs has completed. */
    +  def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }
    +
    +  /** Called when processing of a job of a batch has started. */
    +  def onOutputOperationStarted(
    +      outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }
    +
    +  /** Called when processing of a job of a batch has completed. */
    +  def onOutputOperationCompleted(
    +      outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
    +}
    +
    +class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
    --- End diff --
    
    private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155864069
  
    BTW, LGTM for the new Python APIs. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9186#discussion_r43113846
  
    --- Diff: python/pyspark/streaming/tests.py ---
    @@ -398,6 +399,69 @@ def func(dstream):
             self._test_func(input, func, expected)
     
     
    +class StreamingListenerTests(PySparkStreamingTestCase):
    +
    +    duration = .5
    +
    +    class BatchInfoCollector(StreamingListener):
    +
    +        def __init__(self):
    +            super(StreamingListener, self).__init__()
    +            self.batchInfosCompleted = []
    +            self.batchInfosStarted = []
    +            self.batchInfosSubmitted = []
    +
    +        def onBatchSubmitted(self, batchSubmitted):
    +            self.batchInfosSubmitted.append(self.getEventInfo(batchSubmitted))
    +
    +        def onBatchStarted(self, batchStarted):
    +            self.batchInfosStarted.append(self.getEventInfo(batchStarted))
    +
    +        def onBatchCompleted(self, batchCompleted):
    +            self.batchInfosCompleted.append(self.getEventInfo(batchCompleted))
    +
    +    def test_batch_info_reports(self):
    +        batch_collector = self.BatchInfoCollector()
    +        self.ssc.addStreamingListener(batch_collector)
    +        input = [[1], [2], [3], [4]]
    +
    +        def func(dstream):
    +            return dstream.map(int)
    +        expected = [[1], [2], [3], [4]]
    +        self._test_func(input, func, expected)
    +
    +        # Test occasionally fails without a delay
    +        time.sleep(.1)
    --- End diff --
    
    Take a look at `PySparkStreamingTestCase.wait_for`. You can use it to wait for the expected results or timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-150075765
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44115/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-155673149
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6328] [Python] Python API for Streaming...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9186#issuecomment-153166260
  
    I just sent #9420 to add a Java version of StreamingListener. It will make this PR much easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org