You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by lukasz-antoniak <gi...@git.apache.org> on 2018/11/28 06:04:13 UTC

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

GitHub user lukasz-antoniak opened a pull request:

    https://github.com/apache/bahir/pull/71

    [BAHIR-66] Switch to Java binding for ZeroMQ

    Let me briefly describe the story behind hereby pull request.
    
    Initially, I just wanted to implement integration test for BAHIR-66. Google pointed me to JeroMQ, which provides official ZeroMQ binding for Java and does not require native libraries. I have decided to give it a try, but quickly realized that _akka-zeromq_ module (transient dependency from current Bahir master) is not compatible with JeroMQ. Actually Akka team also wanted to move to JeroMQ (https://github.com/akka/akka/issues/13856), but in the end decided to remove _akka-zeromq_ project completely (https://github.com/akka/akka/issues/15864, https://www.lightbend.com/blog/akka-roadmap-update-2014). Having in mind that _akka-zeromq_ does not support latest version of ZeroMQ protocol and further development may come delayed, I have decided to refactor _streaming-zeromq_ implementation and leverage JeroMQ. With the change we receive various benefits, such as support for PUB-SUB and PUSH-PULL messaging patterns and the ability to bind the socket on whatever end of communication chan
 nel (see test cases), subscription to multiple channels, etc. JeroMQ seems pretty reliable and reconnection is handled out-of-the-box. Actually, we could even start the ZeroMQ subscriber trying to connect to remote socket before other end created and bound the socket. While I tried to preserve backward compatibility of method signatures, there was no easy way to support Akka API and business logic that users could put there (e.g. _akka.actor.ActorSystem_).
    
    Comments welcome. I hope the pull request makes sense and you will like Spark integration with ZeroMQ without dependency on Akka framework.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lukasz-antoniak/bahir BAHIR-66

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/bahir/pull/71.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #71
    
----
commit 48a82946240ab36834e2d1eeab2c007daad9c53a
Author: Lukasz Antoniak <lu...@...>
Date:   2018-11-27T14:58:42Z

    [BAHIR-66] Switch to Java binding for ZeroMQ

----


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lukasz-antoniak <gi...@git.apache.org>.
Github user lukasz-antoniak commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237692301
  
    --- Diff: streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala ---
    @@ -17,147 +17,75 @@
     
     package org.apache.spark.streaming.zeromq
     
    +import java.lang.{Iterable => JIterable}
    +import java.util.{List => JList}
    +
     import scala.collection.JavaConverters._
     import scala.reflect.ClassTag
     
    -import akka.actor.{ActorSystem, Props, SupervisorStrategy}
    -import akka.util.ByteString
    -import akka.zeromq.Subscribe
    -
    -import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
    +import org.apache.spark.api.java.function.{Function => JFunction}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.StreamingContext
    -import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
     import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
     
     object ZeroMQUtils {
       /**
    -   * Create an input stream that receives messages pushed by a zeromq publisher.
    -   * @param ssc StreamingContext object
    -   * @param publisherUrl Url of remote zeromq publisher
    -   * @param subscribe Topic to subscribe to
    -   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
    +   * Create an input stream that receives messages pushed by a ZeroMQ publisher.
    +   * @param ssc Streaming context
    +   * @param publisherUrl URL of remote ZeroMQ publisher
    +   * @param connect When positive, connector will try to establish connectivity with remote server.
    +   *                Otherwise, it attempts to create and bind local socket.
    +   * @param topics List of topics to subscribe
    +   * @param bytesToObjects ZeroMQ stream publishes sequence of frames for each topic
        *                       and each frame has sequence of byte thus it needs the converter
    --- End diff --
    
    Renamed parameter. Initially I wanted to keep source as close as possible to current master. Provided default message converter supporting text payload, and another variant of method creating input stream.


---

[GitHub] bahir issue #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on the issue:

    https://github.com/apache/bahir/pull/71
  
    Thanks, LGTM.


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237031835
  
    --- Diff: streaming-zeromq/pom.xml ---
    @@ -45,22 +45,18 @@
           <version>${spark.version}</version>
           <scope>provided</scope>
         </dependency>
    +    <dependency>
    +      <groupId>org.zeromq</groupId>
    +      <artifactId>jeromq</artifactId>
    +      <version>0.4.3</version>
    +    </dependency>
         <dependency>
    --- End diff --
    
    Could you also please remove the previous dependencies and add the new one to the proper license files.


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237038699
  
    --- Diff: streaming-zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala ---
    @@ -17,147 +17,75 @@
     
     package org.apache.spark.streaming.zeromq
     
    +import java.lang.{Iterable => JIterable}
    +import java.util.{List => JList}
    +
     import scala.collection.JavaConverters._
     import scala.reflect.ClassTag
     
    -import akka.actor.{ActorSystem, Props, SupervisorStrategy}
    -import akka.util.ByteString
    -import akka.zeromq.Subscribe
    -
    -import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
    +import org.apache.spark.api.java.function.{Function => JFunction}
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.StreamingContext
    -import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
     import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
     import org.apache.spark.streaming.dstream.ReceiverInputDStream
     
     object ZeroMQUtils {
       /**
    -   * Create an input stream that receives messages pushed by a zeromq publisher.
    -   * @param ssc StreamingContext object
    -   * @param publisherUrl Url of remote zeromq publisher
    -   * @param subscribe Topic to subscribe to
    -   * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
    +   * Create an input stream that receives messages pushed by a ZeroMQ publisher.
    +   * @param ssc Streaming context
    +   * @param publisherUrl URL of remote ZeroMQ publisher
    +   * @param connect When positive, connector will try to establish connectivity with remote server.
    +   *                Otherwise, it attempts to create and bind local socket.
    +   * @param topics List of topics to subscribe
    +   * @param bytesToObjects ZeroMQ stream publishes sequence of frames for each topic
        *                       and each frame has sequence of byte thus it needs the converter
    --- End diff --
    
    Can this be abstracted from the consumer at least by default, enabling them to configure a complex function if needed (or maybe defining the char encode to be used) ? Otherwise, maybe a better name, like the one you used in the examples (messageConverter)


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/bahir/pull/71


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lukasz-antoniak <gi...@git.apache.org>.
Github user lukasz-antoniak commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237691415
  
    --- Diff: streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala ---
    @@ -15,105 +15,123 @@
      * limitations under the License.
      */
     
    -// scalastyle:off println awaitresult
     package org.apache.spark.examples.streaming.zeromq
     
     import scala.concurrent.Await
     import scala.concurrent.duration.Duration
     import scala.language.implicitConversions
    +import scala.util.Random
     
    -import akka.actor.ActorSystem
    -import akka.actor.actorRef2Scala
    -import akka.util.ByteString
    -import akka.zeromq._
    -import akka.zeromq.Subscribe
     import org.apache.log4j.{Level, Logger}
    +import org.zeromq.ZContext
    +import org.zeromq.ZMQ
    +import org.zeromq.ZMQException
    +import org.zeromq.ZMsg
     
     import org.apache.spark.SparkConf
     import org.apache.spark.streaming.{Seconds, StreamingContext}
    -import org.apache.spark.streaming.zeromq._
    +import org.apache.spark.streaming.zeromq.ZeroMQUtils
     
     /**
    - * A simple publisher for demonstration purposes, repeatedly publishes random Messages
    - * every one second.
    + * Simple publisher for demonstration purposes,
    + * repeatedly publishes random messages every one second.
      */
     object SimpleZeroMQPublisher {
    -
       def main(args: Array[String]): Unit = {
         if (args.length < 2) {
    -      System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
    +      // scalastyle:off println
    +      System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl> <topic>")
    +      // scalastyle:on println
           System.exit(1)
         }
     
         val Seq(url, topic) = args.toSeq
    -    val acs: ActorSystem = ActorSystem()
    -
    -    val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
    -    implicit def stringToByteString(x: String): ByteString = ByteString(x)
    -    val messages: List[ByteString] = List("words ", "may ", "count ")
    -    while (true) {
    -      Thread.sleep(1000)
    -      pubSocket ! ZMQMessage(ByteString(topic) :: messages)
    -    }
    -    Await.result(acs.whenTerminated, Duration.Inf)
    +    val context = new ZContext
    +    val socket = context.createSocket(ZMQ.PUB)
    +    socket.bind(url)
    +
    +    val zmqThread = new Thread(new Runnable {
    +      def run() {
    +        val messages = List("words", "may", "count infinitely")
    +        val random = new Random
    +        while (!Thread.currentThread.isInterrupted) {
    +          try {
    +            Thread.sleep(random.nextInt(1000))
    +            val msg1 = new ZMsg
    +            msg1.add(topic.getBytes)
    +            msg1.add(messages(random.nextInt(messages.size)).getBytes)
    +            msg1.send(socket)
    +          } catch {
    +            case e: ZMQException if ZMQ.Error.ETERM.getCode == e.getErrorCode =>
    +              Thread.currentThread.interrupt()
    +            case e: InterruptedException =>
    +            case e: Throwable => throw e
    +          }
    +        }
    +      }
    +    })
    +
    +    sys.addShutdownHook( {
    +      context.destroy()
    +      zmqThread.interrupt()
    +      zmqThread.join()
    +    } )
    +
    +    zmqThread.start()
       }
     }
     
    -// scalastyle:off
     /**
    - * A sample wordcount with ZeroMQStream stream
    - *
    - * To work with zeroMQ, some native libraries have to be installed.
    - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
    - * (http://www.zeromq.org/intro:get-the-software)
    + * Sample word count with ZeroMQ stream.
      *
    - * Usage: ZeroMQWordCount <zeroMQurl> <topic>
    - *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
    + * Usage: ZeroMQWordCount <zeroMqUrl> <topic>
    + *   <zeroMqUrl> describes where ZeroMQ publisher is running
    + *   <topic> defines logical message type
      *
    - * To run this example locally, you may run publisher as
    + * To run this example locally, you may start publisher as:
      *    `$ bin/run-example \
      *      org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
    - * and run the example as
    + * and run the example as:
      *    `$ bin/run-example \
      *      org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
      */
    -// scalastyle:on
     object ZeroMQWordCount {
       def main(args: Array[String]) {
         if (args.length < 2) {
    -      System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
    +      // scalastyle:off println
    +      System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>")
    +      // scalastyle:on println
           System.exit(1)
         }
     
    -    // Set logging level if log4j not configured (override by adding log4j.properties to classpath)
    -    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
    -      Logger.getRootLogger.setLevel(Level.WARN)
    -    }
    +    // Set logging level if log4j not configured (override by adding log4j.properties to classpath).
    +    Logger.getRootLogger.setLevel(Level.WARN)
     
         val Seq(url, topic) = args.toSeq
         val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
     
    -    // check Spark configuration for master URL, set it to local if not configured
    +    // Check Spark configuration for master URL, set it to local if not present.
         if (!sparkConf.contains("spark.master")) {
           sparkConf.setMaster("local[2]")
         }
     
    -    // Create the context and set the batch size
    +    // Create the context and set the batch size.
         val ssc = new StreamingContext(sparkConf, Seconds(2))
     
    -    def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
    +    def bytesToString(bytes: Array[Array[Byte]]) = {
    +      Seq(new String(bytes(1), zmq.ZMQ.CHARSET))
    --- End diff --
    
    Added test message converter supporting most common scenario.


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lukasz-antoniak <gi...@git.apache.org>.
Github user lukasz-antoniak commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237691550
  
    --- Diff: streaming-zeromq/pom.xml ---
    @@ -45,22 +45,18 @@
           <version>${spark.version}</version>
           <scope>provided</scope>
         </dependency>
    +    <dependency>
    +      <groupId>org.zeromq</groupId>
    +      <artifactId>jeromq</artifactId>
    +      <version>0.4.3</version>
    +    </dependency>
         <dependency>
    --- End diff --
    
    Which files do you refer to? Could not locate them.


---

[GitHub] bahir pull request #71: [BAHIR-66] Switch to Java binding for ZeroMQ

Posted by lresende <gi...@git.apache.org>.
Github user lresende commented on a diff in the pull request:

    https://github.com/apache/bahir/pull/71#discussion_r237029598
  
    --- Diff: streaming-zeromq/examples/src/main/scala/org/apache/spark/examples/streaming/zeromq/ZeroMQWordCount.scala ---
    @@ -15,105 +15,123 @@
      * limitations under the License.
      */
     
    -// scalastyle:off println awaitresult
     package org.apache.spark.examples.streaming.zeromq
     
     import scala.concurrent.Await
     import scala.concurrent.duration.Duration
     import scala.language.implicitConversions
    +import scala.util.Random
     
    -import akka.actor.ActorSystem
    -import akka.actor.actorRef2Scala
    -import akka.util.ByteString
    -import akka.zeromq._
    -import akka.zeromq.Subscribe
     import org.apache.log4j.{Level, Logger}
    +import org.zeromq.ZContext
    +import org.zeromq.ZMQ
    +import org.zeromq.ZMQException
    +import org.zeromq.ZMsg
     
     import org.apache.spark.SparkConf
     import org.apache.spark.streaming.{Seconds, StreamingContext}
    -import org.apache.spark.streaming.zeromq._
    +import org.apache.spark.streaming.zeromq.ZeroMQUtils
     
     /**
    - * A simple publisher for demonstration purposes, repeatedly publishes random Messages
    - * every one second.
    + * Simple publisher for demonstration purposes,
    + * repeatedly publishes random messages every one second.
      */
     object SimpleZeroMQPublisher {
    -
       def main(args: Array[String]): Unit = {
         if (args.length < 2) {
    -      System.err.println("Usage: SimpleZeroMQPublisher <zeroMQUrl> <topic> ")
    +      // scalastyle:off println
    +      System.err.println("Usage: SimpleZeroMQPublisher <zeroMqUrl> <topic>")
    +      // scalastyle:on println
           System.exit(1)
         }
     
         val Seq(url, topic) = args.toSeq
    -    val acs: ActorSystem = ActorSystem()
    -
    -    val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url))
    -    implicit def stringToByteString(x: String): ByteString = ByteString(x)
    -    val messages: List[ByteString] = List("words ", "may ", "count ")
    -    while (true) {
    -      Thread.sleep(1000)
    -      pubSocket ! ZMQMessage(ByteString(topic) :: messages)
    -    }
    -    Await.result(acs.whenTerminated, Duration.Inf)
    +    val context = new ZContext
    +    val socket = context.createSocket(ZMQ.PUB)
    +    socket.bind(url)
    +
    +    val zmqThread = new Thread(new Runnable {
    +      def run() {
    +        val messages = List("words", "may", "count infinitely")
    +        val random = new Random
    +        while (!Thread.currentThread.isInterrupted) {
    +          try {
    +            Thread.sleep(random.nextInt(1000))
    +            val msg1 = new ZMsg
    +            msg1.add(topic.getBytes)
    +            msg1.add(messages(random.nextInt(messages.size)).getBytes)
    +            msg1.send(socket)
    +          } catch {
    +            case e: ZMQException if ZMQ.Error.ETERM.getCode == e.getErrorCode =>
    +              Thread.currentThread.interrupt()
    +            case e: InterruptedException =>
    +            case e: Throwable => throw e
    +          }
    +        }
    +      }
    +    })
    +
    +    sys.addShutdownHook( {
    +      context.destroy()
    +      zmqThread.interrupt()
    +      zmqThread.join()
    +    } )
    +
    +    zmqThread.start()
       }
     }
     
    -// scalastyle:off
     /**
    - * A sample wordcount with ZeroMQStream stream
    - *
    - * To work with zeroMQ, some native libraries have to be installed.
    - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide]
    - * (http://www.zeromq.org/intro:get-the-software)
    + * Sample word count with ZeroMQ stream.
      *
    - * Usage: ZeroMQWordCount <zeroMQurl> <topic>
    - *   <zeroMQurl> and <topic> describe where zeroMq publisher is running.
    + * Usage: ZeroMQWordCount <zeroMqUrl> <topic>
    + *   <zeroMqUrl> describes where ZeroMQ publisher is running
    + *   <topic> defines logical message type
      *
    - * To run this example locally, you may run publisher as
    + * To run this example locally, you may start publisher as:
      *    `$ bin/run-example \
      *      org.apache.spark.examples.streaming.zeromq.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
    - * and run the example as
    + * and run the example as:
      *    `$ bin/run-example \
      *      org.apache.spark.examples.streaming.zeromq.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
      */
    -// scalastyle:on
     object ZeroMQWordCount {
       def main(args: Array[String]) {
         if (args.length < 2) {
    -      System.err.println("Usage: ZeroMQWordCount <zeroMQurl> <topic>")
    +      // scalastyle:off println
    +      System.err.println("Usage: ZeroMQWordCount <zeroMqUrl> <topic>")
    +      // scalastyle:on println
           System.exit(1)
         }
     
    -    // Set logging level if log4j not configured (override by adding log4j.properties to classpath)
    -    if (!Logger.getRootLogger.getAllAppenders.hasMoreElements) {
    -      Logger.getRootLogger.setLevel(Level.WARN)
    -    }
    +    // Set logging level if log4j not configured (override by adding log4j.properties to classpath).
    +    Logger.getRootLogger.setLevel(Level.WARN)
     
         val Seq(url, topic) = args.toSeq
         val sparkConf = new SparkConf().setAppName("ZeroMQWordCount")
     
    -    // check Spark configuration for master URL, set it to local if not configured
    +    // Check Spark configuration for master URL, set it to local if not present.
         if (!sparkConf.contains("spark.master")) {
           sparkConf.setMaster("local[2]")
         }
     
    -    // Create the context and set the batch size
    +    // Create the context and set the batch size.
         val ssc = new StreamingContext(sparkConf, Seconds(2))
     
    -    def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
    +    def bytesToString(bytes: Array[Array[Byte]]) = {
    +      Seq(new String(bytes(1), zmq.ZMQ.CHARSET))
    --- End diff --
    
    How about exposing a const from ZeroMQUtils instead of exposing something from the java api ? this would make it more flexible and independent. 


---