You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/11/29 23:29:09 UTC

[10/31] incubator-iota git commit: Removed scalastyle in ZMQPublisher

Removed scalastyle in ZMQPublisher


Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/7cc1745a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/7cc1745a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/7cc1745a

Branch: refs/heads/master
Commit: 7cc1745a5dcb8c200fd8d6087d9614807a881574
Parents: 1f0895c
Author: Shivansh <sh...@gmail.com>
Authored: Fri Nov 4 00:06:40 2016 +0530
Committer: Shivansh <sh...@gmail.com>
Committed: Fri Nov 4 00:06:40 2016 +0530

----------------------------------------------------------------------
 .../iota/fey/performer/ZMQPublisher.scala       | 40 +++++++++++---------
 1 file changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7cc1745a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
index d7dbf90..252a8bc 100644
--- a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
@@ -32,24 +32,27 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
                    override val autoScale: Boolean = false) extends FeyGenericActor {
 
   //-------default params----------
-  var port: Int = 5559
+  val DEFAULT_PORT = 5559
+  var port: Int = DEFAULT_PORT
   var target: String = "localhost"
+  val DEFAULT_LINGER = 200
+  val DEFAULT_HMW = 10
 
   //-------class vars-------------------
   var ctx: ZMQ.Context = null
   var pub: ZMQ.Socket = null
   var count: Int = 0
 
-  override def onStart = {
+  override def onStart: Unit = {
     log.info("Starting ZMQ Publisher")
     try {
-      _params_check()
+      checkParams()
 
       ctx = ZMQ.context(1)
 
       pub = ctx.socket(ZMQ.PUB)
-      pub.setLinger(200)
-      pub.setHWM(10)
+      pub.setLinger(DEFAULT_LINGER)
+      pub.setHWM(DEFAULT_HMW)
       pub.connect("tcp://" + target + ":" + port)
     }
     catch {
@@ -57,23 +60,23 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
     }
   }
 
-  override def onStop = {
+  override def onStop: Unit = {
     pub.disconnect("tcp://" + target + ":" + port)
   }
 
-  override def onRestart(reason: Throwable) = {
+  override def onRestart(reason: Throwable): Unit = {
     // Called after actor is up and running - after self restart
     try {
-      if (pub != null) {
+      if (Option(pub).isDefined) {
         pub.close()
       }
-      if (ctx != null) {
+      if (Option(ctx).isDefined) {
         ctx.close()
       }
       ctx = ZMQ.context(1)
       pub = ctx.socket(ZMQ.PUB)
-      pub.setLinger(200)
-      pub.setHWM(10)
+      pub.setLinger(DEFAULT_LINGER)
+      pub.setHWM(DEFAULT_HMW)
       pub.connect("tcp://" + target + ":" + port)
     }
     catch {
@@ -87,7 +90,7 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
     case x => log.debug(s"Untreated $x")
   }
 
-  override def execute() = {
+  override def execute(): Unit = {
     log.debug(s"Msg count: $count")
   }
 
@@ -96,7 +99,7 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
     message match {
       case message: String =>
         // Assuming each String message has only point data
-        _zmq_send(s"$message")
+        sendZMQ(s"$message")
 
       //      case message: Map[String, (String,String,String,String)] =>
       //        val formatted_msgs: Array[String] = message.map(point => _format_messages(point._2)).toArray
@@ -106,20 +109,20 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
     }
   }
 
-  def _format_messages(fields: (String, String, String, String)): String = {
+  def formatMessages(fields: (String, String, String, String)): String = {
     // The tuple has the following elements: lrn, timestamp, value, type
     // And we have to create a message with the format:
     // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
-    "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+    s"""DATA|cloud| ${fields._1}|${fields._2}|{"${fields._3}":"${fields._4}"}"""
   }
 
-  def _zmq_send(Message: String) = {
-    log.debug(s"messsage =$Message")
+  def sendZMQ(Message: String): Unit = {
+    log.debug(s"message =$Message")
     pub.send(Message)
     count += 1
   }
 
-  def _params_check() = {
+  def checkParams(): Unit = {
     if (params.contains("zmq_port")) {
       port = params("zmq_port").toInt
     }
@@ -132,3 +135,4 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
 
 
 
+