You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/11/29 23:29:09 UTC
[10/31] incubator-iota git commit: Removed scalastyle in ZMQPublisher
Removed scalastyle in ZMQPublisher
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/7cc1745a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/7cc1745a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/7cc1745a
Branch: refs/heads/master
Commit: 7cc1745a5dcb8c200fd8d6087d9614807a881574
Parents: 1f0895c
Author: Shivansh <sh...@gmail.com>
Authored: Fri Nov 4 00:06:40 2016 +0530
Committer: Shivansh <sh...@gmail.com>
Committed: Fri Nov 4 00:06:40 2016 +0530
----------------------------------------------------------------------
.../iota/fey/performer/ZMQPublisher.scala | 40 +++++++++++---------
1 file changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/7cc1745a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
index d7dbf90..252a8bc 100644
--- a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
@@ -32,24 +32,27 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
override val autoScale: Boolean = false) extends FeyGenericActor {
//-------default params----------
- var port: Int = 5559
+ val DEFAULT_PORT = 5559
+ var port: Int = DEFAULT_PORT
var target: String = "localhost"
+ val DEFAULT_LINGER = 200
+ val DEFAULT_HMW = 10
//-------class vars-------------------
var ctx: ZMQ.Context = null
var pub: ZMQ.Socket = null
var count: Int = 0
- override def onStart = {
+ override def onStart: Unit = {
log.info("Starting ZMQ Publisher")
try {
- _params_check()
+ checkParams()
ctx = ZMQ.context(1)
pub = ctx.socket(ZMQ.PUB)
- pub.setLinger(200)
- pub.setHWM(10)
+ pub.setLinger(DEFAULT_LINGER)
+ pub.setHWM(DEFAULT_HMW)
pub.connect("tcp://" + target + ":" + port)
}
catch {
@@ -57,23 +60,23 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
}
}
- override def onStop = {
+ override def onStop: Unit = {
pub.disconnect("tcp://" + target + ":" + port)
}
- override def onRestart(reason: Throwable) = {
+ override def onRestart(reason: Throwable): Unit = {
// Called after actor is up and running - after self restart
try {
- if (pub != null) {
+ if (Option(pub).isDefined) {
pub.close()
}
- if (ctx != null) {
+ if (Option(ctx).isDefined) {
ctx.close()
}
ctx = ZMQ.context(1)
pub = ctx.socket(ZMQ.PUB)
- pub.setLinger(200)
- pub.setHWM(10)
+ pub.setLinger(DEFAULT_LINGER)
+ pub.setHWM(DEFAULT_HMW)
pub.connect("tcp://" + target + ":" + port)
}
catch {
@@ -87,7 +90,7 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
case x => log.debug(s"Untreated $x")
}
- override def execute() = {
+ override def execute(): Unit = {
log.debug(s"Msg count: $count")
}
@@ -96,7 +99,7 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
message match {
case message: String =>
// Assuming each String message has only point data
- _zmq_send(s"$message")
+ sendZMQ(s"$message")
// case message: Map[String, (String,String,String,String)] =>
// val formatted_msgs: Array[String] = message.map(point => _format_messages(point._2)).toArray
@@ -106,20 +109,20 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
}
}
- def _format_messages(fields: (String, String, String, String)): String = {
+ def formatMessages(fields: (String, String, String, String)): String = {
// The tuple has the following elements: lrn, timestamp, value, type
// And we have to create a message with the format:
// DATA|cloud|lrn|timestamp|{"<type>" : <value>}
- "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+ s"""DATA|cloud| ${fields._1}|${fields._2}|{"${fields._3}":"${fields._4}"}"""
}
- def _zmq_send(Message: String) = {
- log.debug(s"messsage =$Message")
+ def sendZMQ(Message: String): Unit = {
+ log.debug(s"message =$Message")
pub.send(Message)
count += 1
}
- def _params_check() = {
+ def checkParams(): Unit = {
if (params.contains("zmq_port")) {
port = params("zmq_port").toInt
}
@@ -132,3 +135,4 @@ class ZMQPublisher(override val params: Map[String, String] = Map.empty,
+