You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bahir.apache.org by ckadner <gi...@git.apache.org> on 2016/07/16 01:51:44 UTC

[GitHub] bahir pull request #10: [BAHIR-24] fix MQTT Python code, examples, add tests

GitHub user ckadner opened a pull request:

    https://github.com/apache/bahir/pull/10

    [BAHIR-24] fix MQTT Python code, examples, add tests

    [BAHIR-24: Fix MQTT Python code](https://issues.apache.org/jira/browse/BAHIR-24)
    
    
    **Changes in this PR:**
    
    - remove unnecessary files from `streaming-mqtt/python` (`__init__.py`, `dstream.py`)
    - updated all `*.py` files with respect to the modified project structure `pyspark.streaming.mqtt` --> `mqtt` (see Question 1 below) 
    - add test cases that were left out from the import and add shell script to run them (compare to [spark-packages/dstream-mqtt](https://github.com/spark-packages/dstream-mqtt/tree/master/python-tests))
      - `streaming-mqtt/python-tests/run-python-tests.sh`
      - `streaming-mqtt/python-tests/tests.py`
    - modify `MQTTTestUtils.scala` to limit the required disk storage space
    - modify `bin/run-example` script to setup `PYTHONPATH` to run Python examples
    - update the Spark version we are building against from `2.0.0-SNAPSHOT` to `2.0.1-SNAPSHOT`
    
     
    **Open questions:**
    
    1. Should we preserve the original PySpark package structure (pre-Spark 2.0) so users with existing PySpark-MQTT programs don't need to change their import statements? i.e. 
    
      - `from pyspark.streaming.mqtt import MQTTUtils` vs 
      - `from mqtt import MQTTUtils`
    
    2. Should we use the `--py-files` option with `spark-submit` as opposed to setting up the `PYTHONPATH` in the `bin/run-example` script. I did not do it for these reasons.
    
      - the `--py-files` option with individual `*.py` files requires that the example Python scripts are changed to move the import statements after SparkContext initialization
      - alternatively the `--py-files` option requires to create a zip with all the required `*.py` files, but then we should change our packaged binary jar files to include the Python sources at root level so that users can then use `--py-files spark-streaming-mqtt_2.11-2.0.0-SNAPSHOT.jar` without having to create another zip file
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ckadner/bahir BAHIR-24_MQTT_Python_fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir/pull/10.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10
    
----
commit 817b959fb19f8c3fba88844d5a2664a7490f0bde
Author: Christian Kadner <ck...@us.ibm.com>
Date:   2016-07-16T00:49:40Z

    [BAHIR-24] fix MQTT Python code, examples, add tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/10#discussion_r71425377
  
    --- Diff: streaming-mqtt/python-tests/tests.py ---
    @@ -0,0 +1,213 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +import sys
    +import time
    +import random
    +
    +if sys.version_info[:2] <= (2, 6):
    +    try:
    +        import unittest2 as unittest
    +    except ImportError:
    +        sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
    +        sys.exit(1)
    +else:
    +    import unittest
    +
    +from pyspark.context import SparkConf, SparkContext, RDD
    +from pyspark.streaming.context import StreamingContext
    +from mqtt import MQTTUtils
    +
    +
    +class PySparkStreamingTestCase(unittest.TestCase):
    +
    +    timeout = 10  # seconds
    --- End diff --
    
    Hella legit - continues to make me sad every time I see this in the Python code but nothing to be done about this :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/10#discussion_r71425100
  
    --- Diff: streaming-mqtt/python/mqtt.py ---
    @@ -38,19 +38,26 @@ def createStream(ssc, brokerUrl, topic,
             :param storageLevel:  RDD storage level.
             :return: A DStream object
             """
    +        jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
    +
             try:
    -            helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
    -        except TypeError as e:
    -            if str(e) == "'JavaPackage' object is not callable":
    +            helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
    --- End diff --
    
    After 07cb323e7a128b87ef265ddc66f033365d9de463 in Spark you _should_ be able to skip this hack and instead replace with the standard _jvm reference and then in your exception replace the search for classnotfoundexception with "'JavaPackage' object is not callable" inside of TypeError exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by ckadner <gi...@git.apache.org>.
Github user ckadner commented on the issue:

    https://github.com/apache/bahir/pull/10
  
    @holdenk thank you for your review comments! I just pushed an update addressing all but ...
    
    > Hella legit - continues to make me sad every time I see this in the Python code but nothing to be done about this :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/10#discussion_r71425516
  
    --- Diff: streaming-mqtt/python-tests/tests.py ---
    @@ -0,0 +1,213 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +import sys
    +import time
    +import random
    +
    +if sys.version_info[:2] <= (2, 6):
    +    try:
    +        import unittest2 as unittest
    +    except ImportError:
    +        sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
    +        sys.exit(1)
    +else:
    +    import unittest
    +
    +from pyspark.context import SparkConf, SparkContext, RDD
    +from pyspark.streaming.context import StreamingContext
    +from mqtt import MQTTUtils
    +
    +
    +class PySparkStreamingTestCase(unittest.TestCase):
    --- End diff --
    
    You _could_ consider using the PySpark base test case here instead of copying it - might break on update but could be good to reduce copypasta.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir issue #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by holdenk <gi...@git.apache.org>.
Github user holdenk commented on the issue:

    https://github.com/apache/bahir/pull/10
  
    So for the open questions:
    
    I would build them as a jar, or possibly build this into the standard bahir jar rather than updating the Python PATH because you don't necessarily want people to have to install this on all of the nodes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] bahir issue #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by Steve Loughran <st...@gmail.com>.
On 16 July 2016 at 03:23, ckadner <gi...@git.apache.org> wrote:

> Github user ckadner commented on the issue:
>
>     https://github.com/apache/bahir/pull/10
>
>     **How to test these changes:**
>
>     (1) Test that the example `mqtt_wordcount.py` is running fine:
>     ```console
>     export SPARK_HOME="~/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7"
>     export PYTHONPATH="${BAHIR_HOME}/streaming-mqtt/python"
>
>     cd ~/Projects/bahir
>     mvn clean install -pl streaming-mqtt
>
>     ${SPARK_HOME}/bin/spark-submit \
>         --packages
> org.apache.bahir:spark-streaming-mqtt_2.11:2.0.1-SNAPSHOT \
>
> streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py \
>         tcp://localhost:1883 foo
>     ```
>     ```
>     ...
>     16/07/15 19:16:47 WARN AbstractHandler: No Server set for
> org.spark_project.jetty.server.handler.ErrorHandler@1e8d73e4
>
> /Users/ckadner/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58:
> UserWarning: Please install psutil to have better support with spilling
>     -------------------------------------------
>     Time: 2016-07-15 19:16:49
>     -------------------------------------------
>
>     -------------------------------------------
>     Time: 2016-07-15 19:16:50
>     -------------------------------------------
>
>     -------------------------------------------
>     Time: 2016-07-15 19:16:51
>     -------------------------------------------
>     ...
>     ```
>
>     (2) Test the `bin/run-example` script:
>     ```console
>     export SPARK_HOME="~/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7"
>
>     cd ~/Projects/bahir
>     # mvn clean install -pl streaming-mqtt
>
>     bin/run-example \
>
> streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py \
>         tcp://localhost:1883 foo
>     ```
>     ```
>     ...
>     16/07/15 19:21:19 WARN AbstractHandler: No Server set for
> org.spark_project.jetty.server.handler.ErrorHandler@64bc10e5
>
> /Users/ckadner/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58:
> UserWarning: Please install psutil to have better support with spilling
>     -------------------------------------------
>     Time: 2016-07-15 19:21:21
>     -------------------------------------------
>
>     -------------------------------------------
>     Time: 2016-07-15 19:21:22
>     -------------------------------------------
>
>     -------------------------------------------
>     Time: 2016-07-15 19:21:23
>     -------------------------------------------
>
>     ^Z
>     [1]+  Stopped                 bin/run-example
> streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
> tcp://localhost:1883 foo
>     ...
>     ```
>
>
>     (3) Test the error message contains the correct Spark version and
> Scala version when Bahir MQTT package is missing or was not resolved:
>     ```console
>     BAHIR_HOME="~/Projects/bahir"
>     export SPARK_HOME="~/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7"
>     export PYTHONPATH="${BAHIR_HOME}/streaming-mqtt/python"
>
>     ${SPARK_HOME}/bin/spark-submit \
>
> ${BAHIR_HOME}/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py
> \
>         tcp://localhost:1883 foo
>     ```
>     ```
>
> ________________________________________________________________________________________________
>
>       Spark Streaming's MQTT libraries not found in class path. Try one of
> the following.
>
>       1. Include the MQTT library and its dependencies with in the
>          spark-submit command as
>
>          ${SPARK_HOME}/bin/spark-submit --packages
> org.apache.bahir:spark-streaming-mqtt_2.11:2.0.0 ...
>
>       2. Download the JAR of the artifact from Maven Central
> http://search.maven.org/,
>          Group Id = org.apache.bahir, Artifact Id = spark-streaming-mqtt,
> Version = 2.0.0.
>          Then, include the jar in the spark-submit command as
>
>          ${SPARK_HOME}/bin/spark-submit --jars <spark-streaming-mqtt.jar>
> ...
>
> ________________________________________________________________________________________________
>     ```
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
> with INFRA.
> ---
>

[GitHub] bahir issue #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by ckadner <gi...@git.apache.org>.
Github user ckadner commented on the issue:

    https://github.com/apache/bahir/pull/10
  
    **How to test these changes:**
    
    (1) Test that the example `mqtt_wordcount.py` is running fine:
    ```console
    export SPARK_HOME="~/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7"
    export PYTHONPATH="${BAHIR_HOME}/streaming-mqtt/python"
    
    cd ~/Projects/bahir
    mvn clean install -pl streaming-mqtt
    
    ${SPARK_HOME}/bin/spark-submit \
        --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.0.1-SNAPSHOT \
        streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py \
        tcp://localhost:1883 foo
    ```
    ```
    ...
    16/07/15 19:16:47 WARN AbstractHandler: No Server set for org.spark_project.jetty.server.handler.ErrorHandler@1e8d73e4
    /Users/ckadner/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
    -------------------------------------------
    Time: 2016-07-15 19:16:49
    -------------------------------------------
    
    -------------------------------------------
    Time: 2016-07-15 19:16:50
    -------------------------------------------
    
    -------------------------------------------
    Time: 2016-07-15 19:16:51
    -------------------------------------------
    ...
    ```
    
    (2) Test the `bin/run-example` script:
    ```console
    export SPARK_HOME="~/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7"
    
    cd ~/Projects/bahir
    # mvn clean install -pl streaming-mqtt
    
    bin/run-example \
        streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py \
        tcp://localhost:1883 foo
    ```
    ```
    ...
    16/07/15 19:21:19 WARN AbstractHandler: No Server set for org.spark_project.jetty.server.handler.ErrorHandler@64bc10e5
    /Users/ckadner/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
    -------------------------------------------
    Time: 2016-07-15 19:21:21
    -------------------------------------------
    
    -------------------------------------------
    Time: 2016-07-15 19:21:22
    -------------------------------------------
    
    -------------------------------------------
    Time: 2016-07-15 19:21:23
    -------------------------------------------
    
    ^Z
    [1]+  Stopped                 bin/run-example streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py tcp://localhost:1883 foo
    ...
    ```
    
    
    (3) Test the error message contains the correct Spark version and Scala version when Bahir MQTT package is missing or was not resolved:
    ```console
    BAHIR_HOME="~/Projects/bahir"
    export SPARK_HOME="~/Runtimes/spark/spark-2.0.0-preview-bin-hadoop2.7"
    export PYTHONPATH="${BAHIR_HOME}/streaming-mqtt/python"
    
    ${SPARK_HOME}/bin/spark-submit \
        ${BAHIR_HOME}/streaming-mqtt/examples/src/main/python/streaming/mqtt_wordcount.py \
        tcp://localhost:1883 foo
    ```
    ```
    ________________________________________________________________________________________________
    
      Spark Streaming's MQTT libraries not found in class path. Try one of the following.
    
      1. Include the MQTT library and its dependencies with in the
         spark-submit command as
    
         ${SPARK_HOME}/bin/spark-submit --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.0.0 ...
    
      2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
         Group Id = org.apache.bahir, Artifact Id = spark-streaming-mqtt, Version = 2.0.0.
         Then, include the jar in the spark-submit command as
    
         ${SPARK_HOME}/bin/spark-submit --jars <spark-streaming-mqtt.jar> ...
    ________________________________________________________________________________________________
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] bahir pull request #10: [BAHIR-24] fix MQTT Python code, examples, add tests

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir/pull/10


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---