You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:57 UTC
[47/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
index 8847df2..0996381 100644
--- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.cluster.master
import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+/** Master status. Synced means all masters are live and synced. */
object MasterStatus {
type Type = String
val Synced = "synced"
val UnSynced = "unsynced"
}
-
case class MasterNode(host: String, port: Int) {
def toTuple: (String, Int) = {
(host, port)
@@ -33,18 +34,18 @@ case class MasterNode(host: String, port: Int) {
}
/**
- * Master information for REST API call
+ * Master information returned for REST API call
*/
case class MasterSummary(
- leader: MasterNode,
- cluster: List[MasterNode],
- aliveFor: Long,
- logFile: String,
- jarStore: String,
- masterStatus: MasterStatus.Type,
- homeDirectory: String,
- activities: List[MasterActivity],
- jvmName: String,
- historyMetricsConfig: HistoryMetricsConfig = null)
+ leader: MasterNode,
+ cluster: List[MasterNode],
+ aliveFor: Long,
+ logFile: String,
+ jarStore: String,
+ masterStatus: MasterStatus.Type,
+ homeDirectory: String,
+ activities: List[MasterActivity],
+ jvmName: String,
+ historyMetricsConfig: HistoryMetricsConfig = null)
case class MasterActivity(time: Long, event: String)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
index da17829..b25162e 100644
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,50 +15,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.cluster.scheduler
import akka.actor.ActorRef
-import io.gearpump.WorkerId
-case class Resource(slots : Int) {
- def +(other : Resource): Resource = Resource(slots + other.slots)
+import io.gearpump.cluster.worker.WorkerId
+
+case class Resource(slots: Int) {
- def -(other : Resource): Resource = Resource(slots - other.slots)
+ // scalastyle:off spaces.after.plus
+ def +(other: Resource): Resource = Resource(slots + other.slots)
+ // scalastyle:on spaces.after.plus
- def >(other : Resource): Boolean = slots > other.slots
+ def -(other: Resource): Resource = Resource(slots - other.slots)
- def >=(other : Resource): Boolean = !(this < other)
+ def >(other: Resource): Boolean = slots > other.slots
- def <(other : Resource): Boolean = slots < other.slots
+ def >=(other: Resource): Boolean = !(this < other)
- def <=(other : Resource): Boolean = !(this > other)
+ def <(other: Resource): Boolean = slots < other.slots
- def equals(other : Resource): Boolean = slots == other.slots
+ def <=(other: Resource): Boolean = !(this > other)
- def isEmpty: Boolean = slots == 0
+ def isEmpty: Boolean = {
+ slots == 0
+ }
}
-object Priority extends Enumeration{
+/**
+ * Each streaming job can have a priority, the job with higher priority
+ * will get scheduled resource earlier than those with lower priority.
+ */
+object Priority extends Enumeration {
type Priority = Value
val LOW, NORMAL, HIGH = Value
}
-object Relaxation extends Enumeration{
+/**
+ * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by
+ * the requestor application job.
+ */
+object Relaxation extends Enumeration {
type Relaxation = Value
// Option ONEWORKER allow user to schedule a task on specific worker.
val ANY, ONEWORKER, SPECIFICWORKER = Value
}
-import Relaxation._
-import Priority._
-case class ResourceRequest(resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
+import io.gearpump.cluster.scheduler.Priority._
+import io.gearpump.cluster.scheduler.Relaxation._
+
+case class ResourceRequest(
+ resource: Resource, workerId: WorkerId, priority: Priority = NORMAL,
+ relaxation: Relaxation = ANY, executorNum: Int = 1)
-case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : WorkerId)
+case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId)
object Resource {
- def empty = new Resource(0)
+ def empty: Resource = new Resource(0)
- def min(res1: Resource, res2: Resource) = if (res1.slots < res2.slots) res1 else res2
+ def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
index a7e7dd2..8581467 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,37 +15,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.cluster.worker
import com.typesafe.config.Config
-import io.gearpump.util.RichProcess
+
import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.util.RichProcess
/**
- * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
- * User can implement this interface to decide the behavior of launching a process.
- * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
- */
+ * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
+ *
+ * User can implement this interface to decide the behavior of launching a process.
+ * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
+ */
trait ExecutorProcessLauncher {
val config: Config
/**
- * This function will launch a process for Executor using given parameters.
- * @param appId The appId of the executor to be launched
- * @param executorId The executorId of the executor to be launched
- * @param resource The resource allocated for that executor
- * @param options The command options
- * @param classPath The classpath of the process
- * @param mainClass The main class of the process
- * @param arguments The rest arguments
- */
- def createProcess(appId: Int, executorId:Int, resource: Resource, config: Config, options : Array[String],
- classPath : Array[String], mainClass : String, arguments : Array[String]): RichProcess
+ * This function launches a process for Executor using given parameters.
+ *
+ * @param appId The appId of the executor to be launched
+ * @param executorId The executorId of the executor to be launched
+ * @param resource The resource allocated for that executor
+ * @param options The command options
+ * @param classPath The classpath of the process
+ * @param mainClass The main class of the process
+ * @param arguments The rest arguments
+ */
+ def createProcess(
+ appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+ classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess
/**
- * This function will clean resources for a launched process.
- * @param appId The appId of the launched executor
- * @param executorId The executorId of launched executor
- */
+ * This function will clean resources for a launched process.
+ * @param appId The appId of the launched executor
+ * @param executorId The executorId of launched executor
+ */
def cleanProcess(appId: Int, executorId: Int): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
new file mode 100644
index 0000000..24c6ad2
--- /dev/null
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.cluster.worker
+
+/**
+ * WorkerId is used to uniquely track a worker machine.
+ *
+ * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
+ * sessionId is **NOT** unique, so always use WorkerId for comparison.
+ * @param registerTime the timestamp when a worker node register itself to master node
+ */
+case class WorkerId(sessionId: Int, registerTime: Long)
+
+object WorkerId {
+ val unspecified: WorkerId = new WorkerId(-1, 0L)
+
+ def render(workerId: WorkerId): String = {
+ workerId.registerTime + "_" + workerId.sessionId
+ }
+
+ def parse(str: String): WorkerId = {
+ val pair = str.split("_")
+ new WorkerId(pair(1).toInt, pair(0).toLong)
+ }
+
+ implicit val workerIdOrdering: Ordering[WorkerId] = {
+ new Ordering[WorkerId] {
+
+ /** Compare timestamp first, then id */
+ override def compare(x: WorkerId, y: WorkerId): Int = {
+ if (x.registerTime < y.registerTime) {
+ -1
+ } else if (x.registerTime == y.registerTime) {
+ if (x.sessionId < y.sessionId) {
+ -1
+ } else if (x.sessionId == y.sessionId) {
+ 0
+ } else {
+ 1
+ }
+ } else {
+ 1
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
index ca700dc..cdf2d03 100644
--- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
+++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,32 +15,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.gearpump.cluster.worker
-import akka.actor.ActorRef
-import io.gearpump.WorkerId
+package io.gearpump.cluster.worker
import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
/**
* Worker summary information for REST API.
*/
case class WorkerSummary(
- workerId: WorkerId,
- state: String,
- actorPath: String,
- aliveFor: Long,
- logFile: String,
- executors: Array[ExecutorSlots],
- totalSlots: Int,
- availableSlots: Int,
- homeDirectory: String,
- jvmName: String,
- // Id used to uniquely identity this worker process in low level resource manager like YARN.
- resourceManagerContainerId: String,
- historyMetricsConfig: HistoryMetricsConfig = null)
+ workerId: WorkerId,
+ state: String,
+ actorPath: String,
+ aliveFor: Long,
+ logFile: String,
+ executors: Array[ExecutorSlots],
+ totalSlots: Int,
+ availableSlots: Int,
+ homeDirectory: String,
+ jvmName: String,
+ // Id used to uniquely identity this worker process in low level resource manager like YARN.
+ resourceManagerContainerId: String,
+ historyMetricsConfig: HistoryMetricsConfig = null)
-object WorkerSummary{
- def empty = WorkerSummary(WorkerId.unspecified, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+object WorkerSummary {
+ def empty: WorkerSummary = {
+ WorkerSummary(WorkerId.unspecified, "", "", 0L, "",
+ Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+ }
}
case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
index ba01c94..54d5431 100644
--- a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
+++ b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.jarstore
import java.io.File
import java.net.URI
import java.util.ServiceLoader
+import scala.collection.JavaConverters._
-import akka.actor.{ActorSystem, ActorRefFactory}
+import akka.actor.ActorSystem
import com.typesafe.config.Config
-import io.gearpump.util.{Constants, Util}
-import scala.collection.JavaConverters._
+import io.gearpump.util.{Constants, Util}
case class FilePath(path: String)
@@ -39,7 +40,7 @@ trait JarStoreService {
* Like "hdfs" for HDFS file system, and "file" for a local
* file system.
*/
- val scheme : String
+ val scheme: String
/**
* Init the Jar Store.
@@ -47,13 +48,14 @@ trait JarStoreService {
def init(config: Config, system: ActorSystem)
/**
- * This function will copy the local file to the remote JarStore, called from client side.
+ * This function will copy the local file to the remote JarStore, called from client side.
* @param localFile The local file
*/
def copyFromLocal(localFile: File): FilePath
/**
- * This function will copy the remote file to local file system, called from client side.
+ * This function will copy the remote file to local file system, called from client side.
+ *
* @param localFile The destination of file path
* @param remotePath The remote file path from JarStore
*/
@@ -64,7 +66,8 @@ object JarStoreService {
/**
* Get a active JarStoreService by specifying a scheme.
- * Please see config [[Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more
+ *
+ * Please see config [[io.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more
* information.
*/
def get(config: Config): JarStoreService = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
index 269f8f3..3a581fb 100644
--- a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
+++ b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,14 @@
package io.gearpump.metrics
+import scala.collection.JavaConverters._
+
import akka.actor.{ActorRef, ActorSystem}
-import io.gearpump.metrics.Metrics.{Histogram => HistogramData, Meter => MeterData, Counter => CounterData, Gauge => GaugeData}
-import io.gearpump.codahale.metrics.MetricRegistry
-import akka.actor.ActorSystem
-import io.gearpump.codahale.metrics.{Gauge => CodaGauge}
+import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry}
+import io.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData}
import io.gearpump.metrics.MetricsReporterService.ReportTo
import io.gearpump.util.LogUtil
-import scala.collection.JavaConverters._
/**
* A reporter class for logging metrics values to a remote actor periodically
@@ -34,7 +33,7 @@ import scala.collection.JavaConverters._
class AkkaReporter(
system: ActorSystem,
registry: MetricRegistry)
- extends ReportTo{
+ extends ReportTo {
private val LOG = LogUtil.getLogger(getClass)
LOG.info("Start Metrics AkkaReporter")
@@ -57,7 +56,7 @@ class AkkaReporter(
s.get95thPercentile, s.get99thPercentile, s.get999thPercentile)
}
- meters.entrySet().asScala.foreach{pair =>
+ meters.entrySet().asScala.foreach { pair =>
val key = pair.getKey
val value = pair.getValue
to ! MeterData(key,
@@ -67,7 +66,7 @@ class AkkaReporter(
getRateUnit)
}
- gauges.entrySet().asScala.foreach {kv =>
+ gauges.entrySet().asScala.foreach { kv =>
val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue()
to ! GaugeData(kv.getKey, value)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Counter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Counter.scala b/core/src/main/scala/io/gearpump/metrics/Counter.scala
index e0c9c57..70c7bae 100644
--- a/core/src/main/scala/io/gearpump/metrics/Counter.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Counter.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,9 +21,9 @@ package io.gearpump.metrics
import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter}
/**
- * sampleRate: take a data point for every sampleRate...
+ * @see io.gearpump.codahale.metrics.Counter
*/
-class Counter(val name : String, counter : CodaHaleCounter, sampleRate : Int = 1) {
+class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) {
private var sampleCount = 0L
private var toBeIncremented = 0L
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Histogram.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Histogram.scala b/core/src/main/scala/io/gearpump/metrics/Histogram.scala
index 2ad06c3..4673050 100644
--- a/core/src/main/scala/io/gearpump/metrics/Histogram.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Histogram.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,23 +21,23 @@ package io.gearpump.metrics
import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram}
/**
- * sampleRate: take a data point for every sampleRate...
+ * @see io.gearpump.codahale.metrics.Histogram
*/
-class Histogram(val name : String, hisgram : CodaHaleHistogram, sampleRate : Int = 1) {
+class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) {
private var sampleCount = 0L
def update(value: Long) {
sampleCount += 1
- if (null != hisgram && sampleCount % sampleRate == 0) {
- hisgram.update(value)
+ if (null != histogram && sampleCount % sampleRate == 0) {
+ histogram.update(value)
}
}
- def getMean() : Double = {
- hisgram.getSnapshot.getMean
+ def getMean(): Double = {
+ histogram.getSnapshot.getMean
}
- def getStdDev() : Double = {
- hisgram.getSnapshot.getStdDev
+ def getStdDev(): Double = {
+ histogram.getSnapshot.getStdDev
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
index 606a033..28d420a 100644
--- a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
+++ b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala
@@ -1,11 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.gearpump.metrics
import java.util
-
-import io.gearpump.codahale.metrics.jvm.{ThreadStatesGaugeSet, MemoryUsageGaugeSet}
-import io.gearpump.codahale.metrics.{Gauge, Metric, MetricSet}
import scala.collection.JavaConverters._
+import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet}
+import io.gearpump.codahale.metrics.{Metric, MetricSet}
+
class JvmMetricsSet(name: String) extends MetricSet {
override def getMetrics: util.Map[String, Metric] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Meter.scala b/core/src/main/scala/io/gearpump/metrics/Meter.scala
index 660029c..ca79a37 100644
--- a/core/src/main/scala/io/gearpump/metrics/Meter.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Meter.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,7 +20,8 @@ package io.gearpump.metrics
import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
-class Meter(val name : String, meter : CodaHaleMeter, sampleRate : Int = 1) {
+/** See io.gearpump.codahale.metrics.Meter */
+class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
private var sampleCount = 0L
private var toBeMarked = 0L
@@ -37,7 +38,7 @@ class Meter(val name : String, meter : CodaHaleMeter, sampleRate : Int = 1) {
}
}
- def getOneMinuteRate() : Double = {
+ def getOneMinuteRate(): Double = {
meter.getOneMinuteRate
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/Metrics.scala b/core/src/main/scala/io/gearpump/metrics/Metrics.scala
index ba9a59f..aad1af0 100644
--- a/core/src/main/scala/io/gearpump/metrics/Metrics.scala
+++ b/core/src/main/scala/io/gearpump/metrics/Metrics.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,39 +18,40 @@
package io.gearpump.metrics
+import scala.collection.JavaConverters._
import akka.actor._
+import org.slf4j.Logger
+
import io.gearpump.codahale.metrics._
import io.gearpump.metrics
import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
+/** Metric objects registry */
class Metrics(sampleRate: Int) extends Extension {
val registry = new MetricRegistry()
- def meter(name : String) = {
+ def meter(name: String): metrics.Meter = {
new metrics.Meter(name, registry.meter(name), sampleRate)
}
- def histogram(name : String) = {
+ def histogram(name: String): Histogram = {
new Histogram(name, registry.histogram(name), sampleRate)
}
- def histogram(name : String, sampleRate: Int) = {
+ def histogram(name: String, sampleRate: Int): Histogram = {
new Histogram(name, registry.histogram(name), sampleRate)
}
- def counter(name : String) = {
+ def counter(name: String): Counter = {
new Counter(name, registry.counter(name), sampleRate)
}
def register(set: MetricSet): Unit = {
val names = registry.getNames
- val metrics = set.getMetrics.asScala.filterKeys {key => !names.contains(key)}
- metrics.foreach{kv =>
+ val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) }
+ metrics.foreach { kv =>
registry.register(kv._1, kv._2)
}
}
@@ -88,10 +89,10 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
}
}
- case class Histogram
- (name: String, mean: Double,
- stddev: Double, median: Double,
- p95: Double, p99: Double, p999: Double)
+ case class Histogram (
+ name: String, mean: Double,
+ stddev: Double, median: Double,
+ p95: Double, p99: Double, p999: Double)
extends MetricType
case class Counter(name: String, value: Long) extends MetricType
@@ -118,7 +119,7 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
override def get(system: ActorSystem): Metrics = super.get(system)
- override def lookup = Metrics
+ override def lookup: ExtensionId[Metrics] = Metrics
override def createExtension(system: ExtendedActorSystem): Metrics = {
val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED)
@@ -133,27 +134,27 @@ object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
}
class DummyMetrics extends Metrics(1) {
- override def register(set: MetricSet) = Unit
+ override def register(set: MetricSet): Unit = Unit
private val meter = new metrics.Meter("", null) {
- override def mark() = Unit
- override def mark(n: Long) = Unit
+ override def mark(): Unit = Unit
+ override def mark(n: Long): Unit = Unit
override def getOneMinuteRate(): Double = 0
}
private val histogram = new metrics.Histogram("", null) {
- override def update(value: Long) = Unit
- override def getMean() : Double = 0
- override def getStdDev() : Double = 0
+ override def update(value: Long): Unit = Unit
+ override def getMean(): Double = 0
+ override def getStdDev(): Double = 0
}
private val counter = new metrics.Counter("", null) {
- override def inc() = Unit
- override def inc(n: Long) = Unit
+ override def inc(): Unit = Unit
+ override def inc(n: Long): Unit = Unit
}
- override def meter(name : String) = meter
- override def histogram(name : String) = histogram
- override def counter(name : String) = counter
+ override def meter(name: String): metrics.Meter = meter
+ override def histogram(name: String): metrics.Histogram = histogram
+ override def counter(name: String): metrics.Counter = counter
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
index 6c4c34f..f52a060 100644
--- a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
+++ b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,15 @@
package io.gearpump.metrics
-import java.util
-
import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
/**
- * Will aggregate a full set of metrics into a smaller set
+ * Aggregates a larger set of metrics into a smaller set
*
* Sub Class must implement a constructor with signature like this:
- * MetricsAggregator(config: Config)
- *
- *
+ * MetricsAggregator(config: Config)
*/
trait MetricsAggregator {
- def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem]
+ def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
+ : List[HistoryMetricsItem]
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
index c5be041..05decdd 100644
--- a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
+++ b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,16 +20,22 @@ package io.gearpump.metrics
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
+
+import akka.actor.{Actor, ActorRef}
-import akka.actor.{ActorRef, ExtendedActorSystem, Actor}
-import io.gearpump.codahale.metrics.{Slf4jReporter, MetricFilter, ScheduledReporter}
-import io.gearpump.codahale.metrics.graphite.{GraphiteReporter, Graphite}
-import io.gearpump.metrics.Metrics.{ReportMetrics, DemandMoreMetrics}
-import io.gearpump.metrics.MetricsReporterService.{ReportTo}
+import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
+import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
+import io.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
+import io.gearpump.metrics.MetricsReporterService.ReportTo
import io.gearpump.util.Constants._
import io.gearpump.util.LogUtil
-import scala.concurrent.duration._
+/**
+ * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files...
+ *
+ * @param metrics Holds a list of metrics object.
+ */
class MetricsReporterService(metrics: Metrics) extends Actor {
private val LOG = LogUtil.getLogger(getClass)
@@ -40,14 +46,15 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
implicit val dispatcher = context.dispatcher
def receive: Receive = {
+ // The subscriber is demanding more messages.
case DemandMoreMetrics(subscriber) => {
reporter.report(subscriber)
- context.system.scheduler.scheduleOnce(reportInterval milliseconds,
+ context.system.scheduler.scheduleOnce(reportInterval.milliseconds,
subscriber, ReportMetrics)
}
}
- def startGraphiteReporter = {
+ def startGraphiteReporter(): ReportTo = {
val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST)
val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT)
@@ -64,7 +71,7 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
}
}
- def startSlf4jReporter = {
+ def startSlf4jReporter(): ReportTo = {
new ReportTo {
val reporter = Slf4jReporter.forRegistry(metrics.registry)
.convertRatesTo(TimeUnit.SECONDS)
@@ -77,7 +84,7 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
}
}
- def startAkkaReporter = {
+ def startAkkaReporter(): ReportTo = {
new AkkaReporter(system, metrics.registry)
}
@@ -85,16 +92,18 @@ class MetricsReporterService(metrics: Metrics) extends Actor {
val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
LOG.info(s"Metrics reporter is enabled, using $reporterType reporter")
val reporter = reporterType match {
- case "graphite" => startGraphiteReporter
- case "logfile" => startSlf4jReporter
- case "akka" => startAkkaReporter
+ case "graphite" => startGraphiteReporter()
+ case "logfile" => startSlf4jReporter()
+ case "akka" => startAkkaReporter()
}
reporter
}
}
object MetricsReporterService {
- trait ReportTo{
+
+ /** Target where user want to report the metrics data to */
+ trait ReportTo {
def report(to: ActorRef): Unit
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala
index 1ed94a7..1877651 100644
--- a/core/src/main/scala/io/gearpump/package.scala
+++ b/core/src/main/scala/io/gearpump/package.scala
@@ -1,50 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io
package object gearpump {
type TimeStamp = Long
val LatestTime = -1
-
- /**
- * WorkerId is used to uniquely track a worker machine.
- *
- * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
- * sessionId is **NOT** unique, so always use WorkerId for comparison.
- * @param registerTime the timestamp when a worker node register itself to master node
- */
- case class WorkerId(sessionId: Int, registerTime: Long)
-
- object WorkerId {
- val unspecified: WorkerId = new WorkerId(-1, 0L)
-
- def render(workerId: WorkerId): String = {
- workerId.registerTime + "_" + workerId.sessionId
- }
-
- def parse(str: String): WorkerId = {
- val pair = str.split("_")
- new WorkerId(pair(1).toInt, pair(0).toLong)
- }
-
- implicit val workerIdOrdering: Ordering[WorkerId] = {
- new Ordering[WorkerId] {
-
- /** Compare timestamp first, then id */
- override def compare(x: WorkerId, y: WorkerId): Int = {
- if (x.registerTime < y.registerTime) {
- -1
- } else if (x.registerTime == y.registerTime) {
- if (x.sessionId < y.sessionId) {
- -1
- } else if (x.sessionId == y.sessionId) {
- 0
- } else {
- 1
- }
- } else {
- 1
- }
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
index dba02ee..0b9c57e 100644
--- a/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/BroadcastPartitioner.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,11 +20,13 @@ package io.gearpump.partitioner
import io.gearpump.Message
+/** Used by storm module to broadcast message to all downstream tasks */
class BroadcastPartitioner extends MulticastPartitioner {
private var lastPartitionNum = -1
private var partitions = Array.empty[Int]
- override def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+ override def getPartitions(
+ msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
if (partitionNum != lastPartitionNum) {
partitions = (0 until partitionNum).toArray
lastPartitionNum = partitionNum
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
index 3ed6dd4..062fc10 100644
--- a/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/CoLocationPartitioner.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.gearpump.partitioner
import io.gearpump.Message
@@ -7,7 +25,7 @@ import io.gearpump.Message
* And each task in current processor will co-locate with task of last processor
*/
class CoLocationPartitioner extends UnicastPartitioner {
- override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
currentPartitionId
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
index 5e4c7bc..6ba0cd6 100644
--- a/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/HashPartitioner.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -26,7 +26,7 @@ import io.gearpump.Message
* same hash code after serialization and deserialization.
*/
class HashPartitioner extends UnicastPartitioner {
- override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
(msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
index 6285bb7..69104c7 100644
--- a/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/Partitioner.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,22 +18,34 @@
package io.gearpump.partitioner
-import io.gearpump.Message
import scala.reflect.ClassTag
+
import org.apache.commons.lang.SerializationUtils
+import io.gearpump.Message
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
sealed trait Partitioner extends Serializable
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
trait UnicastPartitioner extends Partitioner {
- /**
+
+ /**
+ * Gets the SINGLE downstream processor task index to send message to.
*
- * @param msg
- * @param partitionNum
- * @param currentPartitionId, used when the downstream processor want to share the same
- * partition id,
- * @return
+ * @param msg Message you want to send
+ * @param partitionNum How many tasks does the downstream processor have.
+ * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
+ *
+ * @return ONE task index of downstream processor.
*/
- def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int
+ def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
def getPartition(msg: Message, partitionNum: Int): Int = {
getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
@@ -41,14 +53,20 @@ trait UnicastPartitioner extends Partitioner {
}
trait MulticastPartitioner extends Partitioner {
- def getPartitions(msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int]
+
+ /**
+ * Gets a list of downstream processor task indexes to send message to.
+ *
+ * @param upstreamTaskIndex Current sender task's task index.
+ *
+ */
+ def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
}
}
-
sealed trait PartitionerFactory {
def name: String
@@ -56,26 +74,32 @@ sealed trait PartitionerFactory {
def partitioner: Partitioner
}
-class PartitionerObject(private [this] val _partitioner: Partitioner) extends PartitionerFactory with Serializable {
+/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+ extends PartitionerFactory with Serializable {
override def name: String = partitioner.getClass.getName
- override def partitioner: Partitioner = SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+ override def partitioner: Partitioner = {
+ SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+ }
}
-class PartitionerByClassName(partitionerClass: String) extends PartitionerFactory with Serializable {
- override def name: String = partitionerClass
+/** Store the partitioner in class Name, the user need to instantiate a new class */
+class PartitionerByClassName(partitionerClass: String)
+ extends PartitionerFactory with Serializable {
- override def partitioner: Partitioner = Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+ override def name: String = partitionerClass
+ override def partitioner: Partitioner = {
+ Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+ }
}
-
/**
- * @param partitionerFactory
+ * @param partitionerFactory How we construct a Partitioner.
*/
case class PartitionerDescription(partitionerFactory: PartitionerFactory)
-
object Partitioner {
val UNKNOWN_PARTITION_ID = -1
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
index 035c9d2..ff962fa 100644
--- a/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/ShuffleGroupingPartitioner.scala
@@ -15,11 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.partitioner
-import io.gearpump.Message
import scala.util.Random
+import io.gearpump.Message
+
/**
* The idea of ShuffleGroupingPartitioner is derived from Storm.
* Messages are randomly distributed across the downstream's tasks in a way such that
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
index d0d3c39..6b3c26e 100644
--- a/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
+++ b/core/src/main/scala/io/gearpump/partitioner/ShufflePartitioner.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,17 +23,16 @@ import java.util.Random
import io.gearpump.Message
/**
- * Round Robin partition the data.
+ * Round Robin partition the data to downstream processor tasks.
*/
class ShufflePartitioner extends UnicastPartitioner {
private var seed = 0
private var count = 0
-
- override def getPartition(msg : Message, partitionNum : Int, currentPartitionId: Int) : Int = {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
if (seed == 0) {
- seed = newSeed
+ seed = newSeed()
}
val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
@@ -41,5 +40,5 @@ class ShufflePartitioner extends UnicastPartitioner {
result
}
- def newSeed = new Random().nextInt()
+ private def newSeed(): Int = new Random().nextInt()
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/Authenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/Authenticator.scala b/core/src/main/scala/io/gearpump/security/Authenticator.scala
index afb13a9..73bc8e1 100644
--- a/core/src/main/scala/io/gearpump/security/Authenticator.scala
+++ b/core/src/main/scala/io/gearpump/security/Authenticator.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,23 +17,21 @@
*/
package io.gearpump.security
-import io.gearpump.security.Authenticator.AuthenticationResult
-
import scala.concurrent.{ExecutionContext, Future}
+import io.gearpump.security.Authenticator.AuthenticationResult
/**
* Authenticator for UI dashboard.
*
* Sub Class must implement a constructor with signature like this:
- * this(config: Config)
- *
+ * this(config: Config)
*/
trait Authenticator {
- // TODO: Change the signature to return more attributes of user
- // credentials...
- def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
+ // TODO: Change the signature to return more attributes of user credentials...
+ def authenticate(
+ user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
}
object Authenticator {
@@ -45,22 +43,25 @@ object Authenticator {
def permissionLevel: Int
}
- val UnAuthenticated = new AuthenticationResult{
+ val UnAuthenticated = new AuthenticationResult {
override val authenticated = false
override val permissionLevel = -1
}
- val Guest = new AuthenticationResult{
+ /** Guest can view but have no permission to submit app or write */
+ val Guest = new AuthenticationResult {
override val authenticated = true
override val permissionLevel = 1000
}
- val User = new AuthenticationResult{
+ /** User can submit app, kill app, but have no permission to add or remote machines */
+ val User = new AuthenticationResult {
override val authenticated = true
override val permissionLevel = 1000 + Guest.permissionLevel
}
- val Admin = new AuthenticationResult{
+ /** Super user */
+ val Admin = new AuthenticationResult {
override val authenticated = true
override val permissionLevel = 1000 + User.permissionLevel
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
index 3ecfd88..0743a3f 100644
--- a/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
+++ b/core/src/main/scala/io/gearpump/security/ConfigFileBasedAuthenticator.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,12 @@
package io.gearpump.security
+import scala.concurrent.{ExecutionContext, Future}
+
+import com.typesafe.config.Config
+
import io.gearpump.security.Authenticator.AuthenticationResult
import io.gearpump.security.ConfigFileBasedAuthenticator._
-import com.typesafe.config.Config
-import scala.concurrent.{ExecutionContext, Future}
object ConfigFileBasedAuthenticator {
@@ -30,7 +32,9 @@ object ConfigFileBasedAuthenticator {
private val USERS = ROOT + "." + "users"
private val GUESTS = ROOT + "." + "guests"
- private case class Credentials(admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
+ private case class Credentials(
+ admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
+
def verify(user: String, password: String): AuthenticationResult = {
if (admins.contains(user)) {
if (verify(user, password, admins)) {
@@ -70,26 +74,32 @@ object ConfigFileBasedAuthenticator {
* users have limited permission to submit an application and etc..
* guests can not submit/kill applications, but can view the application status.
*
- * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find information
- * about how to configure this authenticator.
+ * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find
+ * information about how to configure this authenticator.
*
* [Security consideration]
- * It will keep one-way sha1 digest of password instead of password itself. The original password is NOT
- * kept in any way, so generally it is safe.
+ * It will keep one-way sha1 digest of password instead of password itself. The original password is
+ * NOT kept in any way, so generally it is safe.
*
- * digesting flow (from original password to digest):
- * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> base64Encode
*
- * verification user input password with stored digest:
- * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest: salt + sha1 ->
- * compare the generated digest with the stored digest.
+ * digesting flow (from original password to digest):
+ * {{{
+ * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) ->
+ * base64Encode.
+ * }}}
*
+ * Verification user input password with stored digest:
+ * {{{
+ * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest:
+ * salt + sha1 -> compare the generated digest with the stored digest.
+ * }}}
*/
class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
private val credentials = loadCredentials(config)
- override def authenticate(user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult] = {
+ override def authenticate(user: String, password: String, ec: ExecutionContext)
+ : Future[AuthenticationResult] = {
implicit val ctx = ec
Future {
credentials.verify(user, password)
@@ -97,13 +107,13 @@ class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
}
private def loadCredentials(config: Config): Credentials = {
- val admins = configToMap(config, ADMINS)
- val users = configToMap(config, USERS)
+ val admins = configToMap(config, ADMINS)
+ val users = configToMap(config, USERS)
val guests = configToMap(config, GUESTS)
new Credentials(admins, users, guests)
}
- private def configToMap(config : Config, path: String) = {
+ private def configToMap(config: Config, path: String) = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
index f8eafdd..9bf40d2 100644
--- a/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
+++ b/core/src/main/scala/io/gearpump/security/PasswordUtil.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,9 +19,10 @@
package io.gearpump.security
import java.security.MessageDigest
-import sun.misc.{BASE64Decoder, BASE64Encoder}
import scala.util.Try
+import sun.misc.{BASE64Decoder, BASE64Encoder}
+
/**
* Util to verify whether user input password is valid or not.
* It use sha1 to do the digesting.
@@ -30,9 +31,11 @@ object PasswordUtil {
private val SALT_LENGTH = 8
/**
- * verification user input password with stored digest:
+ * Verifies user input password with stored digest:
+ * {{{
* base64Decode -> extract salt -> do sha1(salt, password) ->
* generate digest: salt + sha1 -> compare the generated digest with the stored digest.
+ * }}}
*/
def verify(password: String, stored: String): Boolean = {
Try {
@@ -45,7 +48,10 @@ object PasswordUtil {
}
/**
* digesting flow (from original password to digest):
- * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) -> base64Encode
+ * {{{
+ * random salt byte array of length 8 ->
+ * byte array of (salt + sha1(salt, password)) -> base64Encode
+ * }}}
*/
def hash(password: String): String = {
// Salt generation 64 bits long
@@ -66,8 +72,8 @@ object PasswordUtil {
}
private def base64Encode(data: Array[Byte]): String = {
- val endecoder = new BASE64Encoder()
- endecoder.encode(data)
+ val endecoder = new BASE64Encoder()
+ endecoder.encode(data)
}
private def base64Decode(data: String): Array[Byte] = {
@@ -75,13 +81,14 @@ object PasswordUtil {
decoder.decodeBuffer(data)
}
- private def help = {
+ // scalastyle:off println
+ private def help() = {
Console.println("usage: gear io.gearpump.security.PasswordUtil -password <your password>")
}
def main(args: Array[String]): Unit = {
if (args.length != 2 || args(0) != "-password") {
- help
+ help()
} else {
val pass = args(1)
val result = hash(pass)
@@ -90,4 +97,5 @@ object PasswordUtil {
Console.println(result)
}
}
+ // scalastyle:on println
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
index a69ab43..cb9d563 100644
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
+++ b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializationFramework.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,12 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.serializer
import akka.actor.ExtendedActorSystem
+
import io.gearpump.cluster.UserConfig
-class FastKryoSerializationFramework extends SerializationFramework{
+/**
+ * A build-in serializer framework using kryo
+ *
+ * NOTE: The Kryo here is a shaded version by Gearpump
+ */
+class FastKryoSerializationFramework extends SerializationFramework {
private var system: ExtendedActorSystem = null
private lazy val pool = new ThreadLocal[Serializer]() {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
index 817cd84..57b7b5e 100644
--- a/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
+++ b/core/src/main/scala/io/gearpump/serializer/FastKryoSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,13 +19,14 @@
package io.gearpump.serializer
import akka.actor.ExtendedActorSystem
+
import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
+import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper
import io.gearpump.serializer.FastKryoSerializer.KryoSerializationException
import io.gearpump.util.LogUtil
-import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
-class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{
+class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
private val LOG = LogUtil.getLogger(getClass)
private val config = system.settings.config
@@ -37,7 +38,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{
kryo.setInstantiatorStrategy(strategy)
private val kryoClazz = new GearpumpSerialization(config).customize(kryo)
- override def serialize(message: Any) : Array[Byte] = {
+ override def serialize(message: Any): Array[Byte] = {
try {
kryoSerializer.toBinary(message)
} catch {
@@ -72,7 +73,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer{
}
}
- override def deserialize(msg : Array[Byte]): Any = {
+ override def deserialize(msg: Array[Byte]): Any = {
kryoSerializer.fromBinary(msg)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
index 41ccaa4..a7eb6cf 100644
--- a/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
+++ b/core/src/main/scala/io/gearpump/serializer/GearpumpSerialization.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,17 @@
package io.gearpump.serializer
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
import com.typesafe.config.Config
-import io.gearpump.util.{Constants, LogUtil}
import org.slf4j.Logger
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import io.gearpump.util.{Constants, LogUtil}
+
class GearpumpSerialization(config: Config) {
private val LOG: Logger = LogUtil.getLogger(getClass)
- def customize(kryo: Kryo): Unit = {
+ def customize(kryo: Kryo): Unit = {
val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS)
@@ -37,21 +38,22 @@ class GearpumpSerialization(config: Config) {
if (value == null || value.isEmpty) {
- //Use default serializer for this class type
+ // Use default serializer for this class type
kryo.register(keyClass)
} else {
val valueClass = Class.forName(value)
- val register = kryo.register(keyClass, valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
+ val register = kryo.register(keyClass,
+ valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
LOG.debug(s"Registering ${keyClass}, id: ${register.getId}")
}
}
kryo.setReferences(false)
- // require the user to register the class first before using
+ // Requires the user to register the class first before using
kryo.setRegistrationRequired(true)
}
- private final def configToMap(config : Config, path: String) = {
+ private final def configToMap(config: Config, path: String) = {
import scala.collection.JavaConverters._
config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
index d466ccf..4947dcc 100644
--- a/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
+++ b/core/src/main/scala/io/gearpump/serializer/SerializationFramework.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,15 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.serializer
import akka.actor.ExtendedActorSystem
+
import io.gearpump.cluster.UserConfig
/**
* User are allowed to use a customized serialization framework by extending this
* interface.
- *
*/
trait SerializationFramework {
def init(system: ExtendedActorSystem, config: UserConfig)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/serializer/Serializer.scala b/core/src/main/scala/io/gearpump/serializer/Serializer.scala
index 02e30db..ff8b147 100644
--- a/core/src/main/scala/io/gearpump/serializer/Serializer.scala
+++ b/core/src/main/scala/io/gearpump/serializer/Serializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package io.gearpump.serializer
/**
* User defined message serializer
*/
trait Serializer {
- def serialize(message: Any) : Array[Byte]
+ def serialize(message: Any): Array[Byte]
- def deserialize(msg : Array[Byte]): Any
+ def deserialize(msg: Array[Byte]): Any
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/Express.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/Express.scala b/core/src/main/scala/io/gearpump/transport/Express.scala
index e8f1b8e..101b841 100644
--- a/core/src/main/scala/io/gearpump/transport/Express.scala
+++ b/core/src/main/scala/io/gearpump/transport/Express.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
package io.gearpump.transport
+import scala.collection.immutable.LongMap
+import scala.concurrent._
+
import akka.actor._
import akka.agent.Agent
+import org.slf4j.Logger
+
import io.gearpump.transport.netty.Client.Close
import io.gearpump.transport.netty.{Context, TaskMessage}
import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.immutable.LongMap
-import scala.concurrent._
trait ActorLookupById {
+
+ /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */
def lookupLocalActor(id: Long): Option[ActorRef]
}
@@ -40,8 +43,9 @@ trait ActorLookupById {
*/
class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById {
- import io.gearpump.transport.Express._
import system.dispatcher
+
+ import io.gearpump.transport.Express._
val localActorMap = Agent(LongMap.empty[ActorRef])
val remoteAddressMap = Agent(Map.empty[Long, HostPort])
@@ -59,15 +63,16 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku
LOG.info(s"binding to netty server $localHost")
system.registerOnTermination(new Runnable {
- override def run = context.close
+ override def run(): Unit = context.close()
})
(context, serverPort, localHost)
}
- def unregisterLocalActor(id : Long) : Unit = {
+ def unregisterLocalActor(id: Long): Unit = {
localActorMap.sendOff(_ - id)
}
+ /** Start Netty client actors to connect to remote machines */
def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet
closeClients(clientsToClose)
@@ -85,7 +90,7 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku
def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
remoteClientMap.alter { map =>
- map.filterKeys(hostPorts.contains).foreach{ hostAndClient =>
+ map.filterKeys(hostPorts.contains).foreach { hostAndClient =>
val (_, client) = hostAndClient
client ! Close
}
@@ -93,36 +98,38 @@ class Express(val system: ExtendedActorSystem) extends Extension with ActorLooku
}
}
- def registerLocalActor(id : Long, actor: ActorRef): Unit = {
+ def registerLocalActor(id: Long, actor: ActorRef): Unit = {
LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}")
init
localActorMap.sendOff(_ + (id -> actor))
}
- def lookupLocalActor(id: Long) = localActorMap.get().get(id)
+ def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id)
- def lookupRemoteAddress(id : Long) = remoteAddressMap.get().get(id)
+ def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id)
- //transport to remote address
+ /** Send message to remote task */
def transport(taskMessage: TaskMessage, remote: HostPort): Unit = {
val remoteClient = remoteClientMap.get.get(remote)
if (remoteClient.isDefined) {
remoteClient.get.tell(taskMessage, Actor.noSender)
} else {
- val errorMsg = s"Clients has not been launched properly before transporting messages, the destination is $remote"
+ val errorMsg = s"Clients has not been launched properly before transporting messages, " +
+ s"the destination is $remote"
LOG.error(errorMsg)
throw new Exception(errorMsg)
}
}
}
+/** A customized transport layer by using Akka extension */
object Express extends ExtensionId[Express] with ExtensionIdProvider {
val LOG: Logger = LogUtil.getLogger(getClass)
override def get(system: ActorSystem): Express = super.get(system)
- override def lookup = Express
+ override def lookup: ExtensionId[Express] = Express
override def createExtension(system: ExtendedActorSystem): Express = new Express(system)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/HostPort.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/HostPort.scala b/core/src/main/scala/io/gearpump/transport/HostPort.scala
index 72da203..40c4342 100644
--- a/core/src/main/scala/io/gearpump/transport/HostPort.scala
+++ b/core/src/main/scala/io/gearpump/transport/HostPort.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,7 +25,7 @@ case class HostPort(host: String, port: Int) {
}
object HostPort {
- def apply(address : String) : HostPort = {
+ def apply(address: String): HostPort = {
val hostAndPort = address.split(":")
new HostPort(hostAndPort(0), hostAndPort(1).toInt)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Client.scala b/core/src/main/scala/io/gearpump/transport/netty/Client.scala
index 2ebf09d..d5960ad 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/Client.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/Client.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,18 +23,22 @@ import java.nio.channels.ClosedChannelException
import java.util
import java.util.Random
import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
import akka.actor.Actor
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
import org.slf4j.Logger
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
+import io.gearpump.transport.HostPort
+import io.gearpump.util.LogUtil
-class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) extends Actor {
+/**
+ * Netty Client implemented as an actor, on the other side, there is a netty server Actor.
+ * All messages sent to this actor will be forwarded to remote machine.
+ */
+class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor {
import io.gearpump.transport.netty.Client._
val name = s"netty-client-$hostPort"
@@ -42,7 +46,7 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
private final var bootstrap: ClientBootstrap = null
private final val random: Random = new Random
private val serializer = conf.newTransportSerializer
- private var channel : Channel = null
+ private var channel: Channel = null
var batch = new util.ArrayList[TaskMessage]
@@ -52,25 +56,26 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
self ! Connect(0)
}
- def receive = messageHandler orElse connectionHandler
+ def receive: Receive = messageHandler orElse connectionHandler
- def messageHandler : Receive = {
+ def messageHandler: Receive = {
case msg: TaskMessage =>
batch.add(msg)
- case flush @ Flush(flushChannel) =>
+ case flush@Flush(flushChannel) =>
if (channel != flushChannel) {
- Unit //Drop, as it belong to old channel flush message
+ Unit // Drop, as it belong to old channel flush message
} else if (batch.size > 0 && flushChannel.isWritable) {
send(flushChannel, batch.iterator)
batch.clear()
self ! flush
} else {
import context.dispatcher
- context.system.scheduler.scheduleOnce(new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
+ context.system.scheduler.scheduleOnce(
+ new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
}
}
- def connectionHandler : Receive = {
+ def connectionHandler: Receive = {
case ChannelReady(channel) =>
this.channel = channel
self ! Flush(channel)
@@ -90,12 +95,12 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
context.become(closed)
}
- def closed : Receive = {
- case msg : AnyRef =>
+ def closed: Receive = {
+ case msg: AnyRef =>
LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...")
}
- private def connect(tries: Int) : Unit = {
+ private def connect(tries: Int): Unit = {
LOG.info(s"netty client try to connect to $name, tries: $tries")
if (tries <= conf.max_retries) {
val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port)
@@ -107,7 +112,9 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}")
current.close()
import context.dispatcher
- context.system.scheduler.scheduleOnce(new FiniteDuration(getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
+ context.system.scheduler.scheduleOnce(
+ new FiniteDuration(
+ getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
}
} else {
LOG.error(s"fail to connect to a remote host $name after retied $tries ...")
@@ -144,7 +151,7 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
batch = null
}
- override def postStop() = {
+ override def postStop(): Unit = {
close()
}
@@ -154,9 +161,10 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
if (channel.isOpen) {
channel.close
}
- LOG.error(s"failed to send requests to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
+ LOG.error(s"failed to send requests " +
+ s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
if (!ex.isInstanceOf[ClosedChannelException]) {
- LOG.error(ex.getMessage, ex)
+ LOG.error(ex.getMessage, ex)
}
self ! CompareAndReconnectIfEqual(channel)
}
@@ -175,18 +183,17 @@ class Client(conf: NettyConfig, factory: ChannelFactory, hostPort : HostPort) ex
private def isChannelWritable = (null != channel) && channel.isWritable
}
-
object Client {
val LOG: Logger = LogUtil.getLogger(getClass)
- //Reconnect if current channel equals channel
+ // Reconnect if current channel equals channel
case class CompareAndReconnectIfEqual(channel: Channel)
case class Connect(tries: Int)
- case class ChannelReady(chanel : Channel)
+ case class ChannelReady(chanel: Channel)
case object Close
- case class Flush(channel : Channel)
+ case class Flush(channel: Channel)
class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler {
@@ -210,7 +217,9 @@ object Client {
}
}
- implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = new ChannelFutureOps(channel)
+ implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = {
+ new ChannelFutureOps(channel)
+ }
class ChannelFutureOps(channelFuture: ChannelFuture) {