You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iota.apache.org by to...@apache.org on 2016/07/11 18:46:15 UTC
incubator-iota git commit: renamed Performers -> performers
Repository: incubator-iota
Updated Branches:
refs/heads/master 3d38f1af8 -> f31dfc48f
renamed Performers -> performers
Project: http://git-wip-us.apache.org/repos/asf/incubator-iota/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-iota/commit/f31dfc48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-iota/tree/f31dfc48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-iota/diff/f31dfc48
Branch: refs/heads/master
Commit: f31dfc48faae7fa9e7708f7c9c51759e4eb0125f
Parents: 3d38f1a
Author: tonyfaustini <to...@yahoo.com>
Authored: Mon Jul 11 11:46:06 2016 -0700
Committer: tonyfaustini <to...@yahoo.com>
Committed: Mon Jul 11 11:46:06 2016 -0700
----------------------------------------------------------------------
Performers/NOTICE | 0
.../fey/FeyConfiguration/feyConfiguration | 5 -
Performers/fey/FeyJARRepo/Readme.md | 1 -
Performers/scalastyle-config.xml | 117 ----------------
.../apache/iota/fey/performer/Application.scala | 39 ------
.../apache/iota/fey/performer/Heartbeat.scala | 56 --------
.../apache/iota/fey/performer/Timestamp.scala | 56 --------
.../fey/performer/ActorParamsNotSatisfied.scala | 20 ---
.../apache/iota/fey/performer/Application.scala | 36 -----
.../iota/fey/performer/UnknownException.scala | 19 ---
.../iota/fey/performer/ZMQPublisher.scala | 134 -------------------
.../iota/fey/performer/ZMQSubscriber.scala | 124 -----------------
performers/NOTICE | 0
.../fey/FeyConfiguration/feyConfiguration | 5 +
performers/fey/FeyJARRepo/Readme.md | 1 +
performers/scalastyle-config.xml | 117 ++++++++++++++++
.../apache/iota/fey/performer/Application.scala | 39 ++++++
.../apache/iota/fey/performer/Heartbeat.scala | 56 ++++++++
.../apache/iota/fey/performer/Timestamp.scala | 56 ++++++++
.../fey/performer/ActorParamsNotSatisfied.scala | 20 +++
.../apache/iota/fey/performer/Application.scala | 36 +++++
.../iota/fey/performer/UnknownException.scala | 19 +++
.../iota/fey/performer/ZMQPublisher.scala | 134 +++++++++++++++++++
.../iota/fey/performer/ZMQSubscriber.scala | 124 +++++++++++++++++
24 files changed, 607 insertions(+), 607 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/NOTICE
----------------------------------------------------------------------
diff --git a/Performers/NOTICE b/Performers/NOTICE
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/fey/FeyConfiguration/feyConfiguration
----------------------------------------------------------------------
diff --git a/Performers/fey/FeyConfiguration/feyConfiguration b/Performers/fey/FeyConfiguration/feyConfiguration
deleted file mode 100644
index 4e2b650..0000000
--- a/Performers/fey/FeyConfiguration/feyConfiguration
+++ /dev/null
@@ -1,5 +0,0 @@
-fey-global-configuration{
- json-repository = "/Users/tonyfaustini19/IntelliJProjects/apache-incubator-iota/performers/fey/FeyJSONRepo"
- jar-repository = "/Users/tonyfaustini19/IntelliJProjects/apache-incubator-iota/performers/fey/FeyJARRepo"
- enable-checkpoint=false
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/fey/FeyJARRepo/Readme.md
----------------------------------------------------------------------
diff --git a/Performers/fey/FeyJARRepo/Readme.md b/Performers/fey/FeyJARRepo/Readme.md
deleted file mode 100644
index 2fd9f95..0000000
--- a/Performers/fey/FeyJARRepo/Readme.md
+++ /dev/null
@@ -1 +0,0 @@
-TBD
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/Performers/scalastyle-config.xml b/Performers/scalastyle-config.xml
deleted file mode 100644
index 7e3596f..0000000
--- a/Performers/scalastyle-config.xml
+++ /dev/null
@@ -1,117 +0,0 @@
-<scalastyle>
- <name>Scalastyle standard configuration</name>
- <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxFileLength"><![CDATA[800]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
- <parameters>
- <parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors.
-// See the LICENCE.txt file distributed with this work for additional
-// information regarding copyright ownership.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxLineLength"><![CDATA[160]]></parameter>
- <parameter name="tabSize"><![CDATA[4]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
- <parameters>
- <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
- <parameters>
- <parameter name="maxParameters"><![CDATA[8]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
- <parameters>
- <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[println]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
- <parameters>
- <parameter name="maxTypes"><![CDATA[30]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
- <parameters>
- <parameter name="maximum"><![CDATA[10]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
- <parameters>
- <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
- <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxLength"><![CDATA[50]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
- <parameters>
- <parameter name="maxMethods"><![CDATA[30]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
-</scalastyle>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
----------------------------------------------------------------------
diff --git a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
deleted file mode 100644
index a574c26..0000000
--- a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iota.fey.performer
-
-
-import akka.actor.{ActorSystem, Props}
-import org.apache.iota.fey.FeyGenericActor.PROCESS
-import scala.concurrent.duration._
-
-object Application extends App {
-
- println("Starting")
-
- implicit val system = ActorSystem("STREAM-RUN")
-
- val timestamp = system.actorOf(Props(classOf[Timestamp], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "TIMESTAMP")
-
- timestamp ! PROCESS("Stream it")
-
- val heartbeat = system.actorOf(Props(classOf[Heartbeat], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "HEARTBEAT")
-
- heartbeat ! PROCESS("Stream it")
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
----------------------------------------------------------------------
diff --git a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
deleted file mode 100644
index ad16632..0000000
--- a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.iota.fey.performer
-
-import akka.actor.ActorRef
-import org.apache.iota.fey.FeyGenericActor
-
-import scala.collection.immutable.Map
-import scala.concurrent.duration._
-
-class Heartbeat(override val params: Map[String, String] = Map.empty,
- override val backoff: FiniteDuration = 1.minutes,
- override val connectTo: Map[String, ActorRef] = Map.empty,
- override val schedulerTimeInterval: FiniteDuration = 30.seconds,
- override val orchestrationName: String = "",
- override val orchestrationID: String = "",
- override val autoScale: Boolean = false) extends FeyGenericActor {
-
- override def onStart = {
- }
-
- override def onStop = {
- }
-
- override def onRestart(reason: Throwable) = {
- // Called after actor is up and running - after self restart
- }
-
- override def customReceive: Receive = {
- case x => log.info(s"Untreated $x")
- }
-
- override def processMessage[T](message: T, sender: ActorRef): Unit = {
- }
-
- override def execute() = {
- log.info("Alive")
- propagateMessage("Alive")
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
----------------------------------------------------------------------
diff --git a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala b/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
deleted file mode 100644
index 3f22688..0000000
--- a/Performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iota.fey.performer
-
-import akka.actor.ActorRef
-import org.apache.iota.fey.FeyGenericActor
-
-import scala.collection.immutable.Map
-import scala.concurrent.duration._
-
-class Timestamp(override val params: Map[String, String] = Map.empty,
- override val backoff: FiniteDuration = 1.minutes,
- override val connectTo: Map[String, ActorRef] = Map.empty,
- override val schedulerTimeInterval: FiniteDuration = 30.seconds,
- override val orchestrationName: String = "",
- override val orchestrationID: String = "",
- override val autoScale: Boolean = false) extends FeyGenericActor {
-
- override def onStart = {
- }
-
- override def onStop = {
- }
-
- override def onRestart(reason: Throwable) = {
- // Called after actor is up and running - after self restart
- }
-
- override def customReceive: Receive = {
- case x => log.info(s"Untreated $x")
- }
-
- override def processMessage[T](message: T, sender: ActorRef): Unit = {
- }
-
- override def execute() = {
- val ts = java.lang.System.currentTimeMillis()
- log.info(ts.toString)
- propagateMessage(ts.toString)
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
deleted file mode 100644
index 8b6359f..0000000
--- a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iota.fey.performer
-
-
-case class ActorParamsNotSatisfied(message: String) extends Exception(message)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
deleted file mode 100644
index d99a826..0000000
--- a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iota.fey.performer
-
-import akka.actor.{ActorSystem, Props}
-import org.apache.iota.fey.FeyGenericActor.PROCESS
-import scala.concurrent.duration._
-
-object Application extends App {
-
- //println("Starting")
-
- //implicit val system = ActorSystem("ZMQ-RUN")
-
- //val publish = system.actorOf(Props(classOf[ZMQPublisher], Map.empty,1.minutes, Map.empty, 1.seconds,"","",false ), name = "PUBLISH")
-
- //publish ! PROCESS("Publish it")
-
- // val subscribe = system.actorOf(Props(classOf[ZMQPublisher], Map.empty,1.minutes, Map.empty, 1.seconds,"","",false ), name = "SUBSCRIBE")
- //
- // subscribe ! PROCESS("Subscribe to it")
-}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
deleted file mode 100644
index 032eb27..0000000
--- a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iota.fey.performer
-
-case class UnknownException(message: String) extends Exception(message)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
deleted file mode 100644
index 4c43d45..0000000
--- a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iota.fey.performer
-
-import akka.actor.ActorRef
-import org.apache.iota.fey.FeyGenericActor
-import org.zeromq.ZMQ
-
-import scala.concurrent.duration._
-
-
-class ZMQPublisher(override val params: Map[String, String] = Map.empty,
- override val backoff: FiniteDuration = 1.minutes,
- override val connectTo: Map[String, ActorRef] = Map.empty,
- override val schedulerTimeInterval: FiniteDuration = 2.seconds,
- override val orchestrationName: String = "",
- override val orchestrationID: String = "",
- override val autoScale: Boolean = false) extends FeyGenericActor {
-
- //-------default params----------
- var port: Int = 5559
- var target: String = "localhost"
-
- //-------class vars-------------------
- var ctx: ZMQ.Context = null
- var pub: ZMQ.Socket = null
- var count: Int = 0
-
- override def onStart = {
- log.info("Starting ZMQ Publisher")
- try {
- _params_check()
-
- ctx = ZMQ.context(1)
-
- pub = ctx.socket(ZMQ.PUB)
- pub.setLinger(200)
- pub.setHWM(10)
- pub.connect("tcp://" + target + ":" + port)
- }
- catch {
- case e: ActorParamsNotSatisfied => throw e
- }
- }
-
- override def onStop = {
- pub.disconnect("tcp://" + target + ":" + port)
- }
-
- override def onRestart(reason: Throwable) = {
- // Called after actor is up and running - after self restart
- try {
- if (pub != null) {
- pub.close()
- }
- if (ctx != null) {
- ctx.close()
- }
- ctx = ZMQ.context(1)
- pub = ctx.socket(ZMQ.PUB)
- pub.setLinger(200)
- pub.setHWM(10)
- pub.connect("tcp://" + target + ":" + port)
- }
- catch {
- case e: ActorParamsNotSatisfied => throw e
- case default: Throwable => throw new UnknownException("onRestart failed because of an exception")
- }
-
- }
-
- override def customReceive: Receive = {
- case x => log.info(s"Untreated $x")
- }
-
- override def execute() = {
- log.info(s"Msg count: $count")
- }
-
- override def processMessage[T](message: T, sender: ActorRef): Unit = {
- //log.info(message.asInstanceOf[String])
- message match {
- case message: String =>
- // Assuming each String message has only point data
- _zmq_send(s"$message")
-
- // case message: Map[String, (String,String,String,String)] =>
- // val formatted_msgs: Array[String] = message.map(point => _format_messages(point._2)).toArray
- // formatted_msgs.foreach(x => _zmq_send(x))
-
- case _ => log.error("Ignoring this message as format not expected")
- }
- }
-
- def _format_messages(fields: (String, String, String, String)): String = {
- // The tuple has the following elements: lrn, timestamp, value, type
- // And we have to create a message with the format:
- // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
- "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
- }
-
- def _zmq_send(Message: String) = {
- log.info(s"messsage =$Message")
- pub.send(Message)
- count += 1
- }
-
- def _params_check() = {
- if (params.contains("zmq_port")) {
- port = params("zmq_port").toInt
- }
- if (params.contains("zmq_target")) {
- target = params("zmq_target")
- }
- }
-
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
----------------------------------------------------------------------
diff --git a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala b/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
deleted file mode 100644
index 3decb70..0000000
--- a/Performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.iota.fey.performer
-
-import akka.actor.ActorRef
-import org.apache.iota.fey.FeyGenericActor
-import org.zeromq.ZMQ
-
-import scala.concurrent.duration._
-
-class ZMQSubscriber(override val params: Map[String, String] = Map.empty,
- override val backoff: FiniteDuration = 1.minutes,
- override val connectTo: Map[String, ActorRef] = Map.empty,
- override val schedulerTimeInterval: FiniteDuration = 2.seconds,
- override val orchestrationName: String = "",
- override val orchestrationID: String = "",
- override val autoScale: Boolean = false) extends FeyGenericActor {
-
- //-------default params----------
- var port: Int = 5563
- var target: String = "localhost"
- val topic_filter: String = "DATA"
-
- //-------class vars-------------------
- var ctx: ZMQ.Context = null
- var pub: ZMQ.Socket = null
- var count: Int = 0
-
- override def onStart = {
- log.info("Starting ZMQ Subscriber")
- try {
-
- _params_check()
-
- // Prepare our context and subscriber
- ctx = ZMQ.context(1)
- val subscriber = ctx.socket(ZMQ.SUB)
-
- subscriber.bind(s"tcp://$target:$port")
- subscriber.subscribe(topic_filter.getBytes())
- while (true) {
- // Read envelope with address
- val address = new String(subscriber.recv(0))
- // Read message contents
- val contents = new String(subscriber.recv(0))
- log.info(s"HERE IT IS $address : $contents")
- count += 1
- }
- }
- catch {
- case e: ActorParamsNotSatisfied => throw e
- }
- }
-
- override def onStop = {
- pub.disconnect("tcp://" + target + ":" + port)
- pub.close()
- ctx.close()
- pub = null
- ctx = null
- }
-
- override def onRestart(reason: Throwable) = {
- // Called after actor is up and running - after self restart
- try {
- if (pub != null) {
- pub.close()
- }
- if (ctx != null) {
- ctx.close()
- }
- }
- catch {
- case e: ActorParamsNotSatisfied => throw e
- case default: Throwable => throw new UnknownException("onRestart failed because of an exception")
- }
-
- }
-
- override def customReceive: Receive = {
- case x => log.info(s"Untreated $x")
- }
-
- override def execute() = {
- log.info(s"Msg count: $count")
- }
-
- override def processMessage[T](message: T, sender: ActorRef): Unit = {
- message match {
- case _ => log.info("Ignoring this message as format not expected")
- }
- }
-
- def _format_messages(fields: (String, String, String, String)): String = {
- // The tuple has the following elements: lrn, timestamp, value, type
- // And we have to create a message with the format:
- // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
- "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
- }
-
- def _params_check() = {
- if (params.contains("zmq_port")) {
- port = params("zmq_port").toInt
- }
- if (params.contains("zmq_target")) {
- target = params("zmq_target")
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/NOTICE
----------------------------------------------------------------------
diff --git a/performers/NOTICE b/performers/NOTICE
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/fey/FeyConfiguration/feyConfiguration
----------------------------------------------------------------------
diff --git a/performers/fey/FeyConfiguration/feyConfiguration b/performers/fey/FeyConfiguration/feyConfiguration
new file mode 100644
index 0000000..4e2b650
--- /dev/null
+++ b/performers/fey/FeyConfiguration/feyConfiguration
@@ -0,0 +1,5 @@
+fey-global-configuration{
+ json-repository = "/Users/tonyfaustini19/IntelliJProjects/apache-incubator-iota/performers/fey/FeyJSONRepo"
+ jar-repository = "/Users/tonyfaustini19/IntelliJProjects/apache-incubator-iota/performers/fey/FeyJARRepo"
+ enable-checkpoint=false
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/fey/FeyJARRepo/Readme.md
----------------------------------------------------------------------
diff --git a/performers/fey/FeyJARRepo/Readme.md b/performers/fey/FeyJARRepo/Readme.md
new file mode 100644
index 0000000..2fd9f95
--- /dev/null
+++ b/performers/fey/FeyJARRepo/Readme.md
@@ -0,0 +1 @@
+TBD
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/performers/scalastyle-config.xml b/performers/scalastyle-config.xml
new file mode 100644
index 0000000..7e3596f
--- /dev/null
+++ b/performers/scalastyle-config.xml
@@ -0,0 +1,117 @@
+<scalastyle>
+ <name>Scalastyle standard configuration</name>
+ <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+ <parameters>
+ <parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors.
+// See the LICENCE.txt file distributed with this work for additional
+// information regarding copyright ownership.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLineLength"><![CDATA[160]]></parameter>
+ <parameter name="tabSize"><![CDATA[4]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+ <parameters>
+ <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="maxParameters"><![CDATA[8]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[println]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
+ <parameters>
+ <parameter name="maxTypes"><![CDATA[30]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
+ <parameters>
+ <parameter name="maximum"><![CDATA[10]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+ <parameters>
+ <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+ <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLength"><![CDATA[50]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
+ <parameters>
+ <parameter name="maxMethods"><![CDATA[30]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+</scalastyle>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
----------------------------------------------------------------------
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
new file mode 100644
index 0000000..a574c26
--- /dev/null
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey.performer
+
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.iota.fey.FeyGenericActor.PROCESS
+import scala.concurrent.duration._
+
+object Application extends App {
+
+ println("Starting")
+
+ implicit val system = ActorSystem("STREAM-RUN")
+
+ val timestamp = system.actorOf(Props(classOf[Timestamp], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "TIMESTAMP")
+
+ timestamp ! PROCESS("Stream it")
+
+ val heartbeat = system.actorOf(Props(classOf[Heartbeat], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "HEARTBEAT")
+
+ heartbeat ! PROCESS("Stream it")
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
----------------------------------------------------------------------
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
new file mode 100644
index 0000000..ad16632
--- /dev/null
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+
+import scala.collection.immutable.Map
+import scala.concurrent.duration._
+
+class Heartbeat(override val params: Map[String, String] = Map.empty,
+ override val backoff: FiniteDuration = 1.minutes,
+ override val connectTo: Map[String, ActorRef] = Map.empty,
+ override val schedulerTimeInterval: FiniteDuration = 30.seconds,
+ override val orchestrationName: String = "",
+ override val orchestrationID: String = "",
+ override val autoScale: Boolean = false) extends FeyGenericActor {
+
+ override def onStart = {
+ }
+
+ override def onStop = {
+ }
+
+ override def onRestart(reason: Throwable) = {
+ // Called after actor is up and running - after self restart
+ }
+
+ override def customReceive: Receive = {
+ case x => log.info(s"Untreated $x")
+ }
+
+ override def processMessage[T](message: T, sender: ActorRef): Unit = {
+ }
+
+ override def execute() = {
+ log.info("Alive")
+ propagateMessage("Alive")
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
----------------------------------------------------------------------
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
new file mode 100644
index 0000000..3f22688
--- /dev/null
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+
+import scala.collection.immutable.Map
+import scala.concurrent.duration._
+
+class Timestamp(override val params: Map[String, String] = Map.empty,
+ override val backoff: FiniteDuration = 1.minutes,
+ override val connectTo: Map[String, ActorRef] = Map.empty,
+ override val schedulerTimeInterval: FiniteDuration = 30.seconds,
+ override val orchestrationName: String = "",
+ override val orchestrationID: String = "",
+ override val autoScale: Boolean = false) extends FeyGenericActor {
+
+ override def onStart = {
+ }
+
+ override def onStop = {
+ }
+
+ override def onRestart(reason: Throwable) = {
+ // Called after actor is up and running - after self restart
+ }
+
+ override def customReceive: Receive = {
+ case x => log.info(s"Untreated $x")
+ }
+
+ override def processMessage[T](message: T, sender: ActorRef): Unit = {
+ }
+
+ override def execute() = {
+ val ts = java.lang.System.currentTimeMillis()
+ log.info(ts.toString)
+ propagateMessage(ts.toString)
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
new file mode 100644
index 0000000..8b6359f
--- /dev/null
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ActorParamsNotSatisfied.scala
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+
+case class ActorParamsNotSatisfied(message: String) extends Exception(message)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
new file mode 100644
index 0000000..d99a826
--- /dev/null
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.iota.fey.FeyGenericActor.PROCESS
+import scala.concurrent.duration._
+
+object Application extends App {
+
+ //println("Starting")
+
+ //implicit val system = ActorSystem("ZMQ-RUN")
+
+ //val publish = system.actorOf(Props(classOf[ZMQPublisher], Map.empty,1.minutes, Map.empty, 1.seconds,"","",false ), name = "PUBLISH")
+
+ //publish ! PROCESS("Publish it")
+
+ // val subscribe = system.actorOf(Props(classOf[ZMQPublisher], Map.empty,1.minutes, Map.empty, 1.seconds,"","",false ), name = "SUBSCRIBE")
+ //
+ // subscribe ! PROCESS("Subscribe to it")
+}
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
new file mode 100644
index 0000000..032eb27
--- /dev/null
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/UnknownException.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+case class UnknownException(message: String) extends Exception(message)
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
new file mode 100644
index 0000000..4c43d45
--- /dev/null
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+import org.zeromq.ZMQ
+
+import scala.concurrent.duration._
+
+
+class ZMQPublisher(override val params: Map[String, String] = Map.empty,
+ override val backoff: FiniteDuration = 1.minutes,
+ override val connectTo: Map[String, ActorRef] = Map.empty,
+ override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+ override val orchestrationName: String = "",
+ override val orchestrationID: String = "",
+ override val autoScale: Boolean = false) extends FeyGenericActor {
+
+ //-------default params----------
+ var port: Int = 5559
+ var target: String = "localhost"
+
+ //-------class vars-------------------
+ var ctx: ZMQ.Context = null
+ var pub: ZMQ.Socket = null
+ var count: Int = 0
+
+ override def onStart = {
+ log.info("Starting ZMQ Publisher")
+ try {
+ _params_check()
+
+ ctx = ZMQ.context(1)
+
+ pub = ctx.socket(ZMQ.PUB)
+ pub.setLinger(200)
+ pub.setHWM(10)
+ pub.connect("tcp://" + target + ":" + port)
+ }
+ catch {
+ case e: ActorParamsNotSatisfied => throw e
+ }
+ }
+
+ override def onStop = {
+ pub.disconnect("tcp://" + target + ":" + port)
+ }
+
+ override def onRestart(reason: Throwable) = {
+ // Called after actor is up and running - after self restart
+ try {
+ if (pub != null) {
+ pub.close()
+ }
+ if (ctx != null) {
+ ctx.close()
+ }
+ ctx = ZMQ.context(1)
+ pub = ctx.socket(ZMQ.PUB)
+ pub.setLinger(200)
+ pub.setHWM(10)
+ pub.connect("tcp://" + target + ":" + port)
+ }
+ catch {
+ case e: ActorParamsNotSatisfied => throw e
+ case default: Throwable => throw new UnknownException("onRestart failed because of an exception")
+ }
+
+ }
+
+ override def customReceive: Receive = {
+ case x => log.info(s"Untreated $x")
+ }
+
+ override def execute() = {
+ log.info(s"Msg count: $count")
+ }
+
+ override def processMessage[T](message: T, sender: ActorRef): Unit = {
+ //log.info(message.asInstanceOf[String])
+ message match {
+ case message: String =>
+ // Assuming each String message has only point data
+ _zmq_send(s"$message")
+
+ // case message: Map[String, (String,String,String,String)] =>
+ // val formatted_msgs: Array[String] = message.map(point => _format_messages(point._2)).toArray
+ // formatted_msgs.foreach(x => _zmq_send(x))
+
+ case _ => log.error("Ignoring this message as format not expected")
+ }
+ }
+
+ def _format_messages(fields: (String, String, String, String)): String = {
+ // The tuple has the following elements: lrn, timestamp, value, type
+ // And we have to create a message with the format:
+ // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
+ "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+ }
+
+ def _zmq_send(Message: String) = {
+ log.info(s"messsage =$Message")
+ pub.send(Message)
+ count += 1
+ }
+
+ def _params_check() = {
+ if (params.contains("zmq_port")) {
+ port = params("zmq_port").toInt
+ }
+ if (params.contains("zmq_target")) {
+ target = params("zmq_target")
+ }
+ }
+
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-iota/blob/f31dfc48/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
----------------------------------------------------------------------
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
new file mode 100644
index 0000000..3decb70
--- /dev/null
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iota.fey.performer
+
+import akka.actor.ActorRef
+import org.apache.iota.fey.FeyGenericActor
+import org.zeromq.ZMQ
+
+import scala.concurrent.duration._
+
+class ZMQSubscriber(override val params: Map[String, String] = Map.empty,
+ override val backoff: FiniteDuration = 1.minutes,
+ override val connectTo: Map[String, ActorRef] = Map.empty,
+ override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+ override val orchestrationName: String = "",
+ override val orchestrationID: String = "",
+ override val autoScale: Boolean = false) extends FeyGenericActor {
+
+ //-------default params----------
+ var port: Int = 5563
+ var target: String = "localhost"
+ val topic_filter: String = "DATA"
+
+ //-------class vars-------------------
+ var ctx: ZMQ.Context = null
+ var pub: ZMQ.Socket = null
+ var count: Int = 0
+
+ override def onStart = {
+ log.info("Starting ZMQ Subscriber")
+ try {
+
+ _params_check()
+
+ // Prepare our context and subscriber
+ ctx = ZMQ.context(1)
+ val subscriber = ctx.socket(ZMQ.SUB)
+
+ subscriber.bind(s"tcp://$target:$port")
+ subscriber.subscribe(topic_filter.getBytes())
+ while (true) {
+ // Read envelope with address
+ val address = new String(subscriber.recv(0))
+ // Read message contents
+ val contents = new String(subscriber.recv(0))
+ log.info(s"HERE IT IS $address : $contents")
+ count += 1
+ }
+ }
+ catch {
+ case e: ActorParamsNotSatisfied => throw e
+ }
+ }
+
+ override def onStop = {
+ pub.disconnect("tcp://" + target + ":" + port)
+ pub.close()
+ ctx.close()
+ pub = null
+ ctx = null
+ }
+
+ override def onRestart(reason: Throwable) = {
+ // Called after actor is up and running - after self restart
+ try {
+ if (pub != null) {
+ pub.close()
+ }
+ if (ctx != null) {
+ ctx.close()
+ }
+ }
+ catch {
+ case e: ActorParamsNotSatisfied => throw e
+ case default: Throwable => throw new UnknownException("onRestart failed because of an exception")
+ }
+
+ }
+
+ override def customReceive: Receive = {
+ case x => log.info(s"Untreated $x")
+ }
+
+ override def execute() = {
+ log.info(s"Msg count: $count")
+ }
+
+ override def processMessage[T](message: T, sender: ActorRef): Unit = {
+ message match {
+ case _ => log.info("Ignoring this message as format not expected")
+ }
+ }
+
+ def _format_messages(fields: (String, String, String, String)): String = {
+ // The tuple has the following elements: lrn, timestamp, value, type
+ // And we have to create a message with the format:
+ // DATA|cloud|lrn|timestamp|{"<type>" : <value>}
+ "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+ }
+
+ def _params_check() = {
+ if (params.contains("zmq_port")) {
+ port = params("zmq_port").toInt
+ }
+ if (params.contains("zmq_target")) {
+ target = params("zmq_target")
+ }
+ }
+
+}
\ No newline at end of file