You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:53 UTC
[43/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala
new file mode 100644
index 0000000..87db442
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import java.util
+import scala.collection.JavaConverters._
+
+import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet}
+import io.gearpump.codahale.metrics.{Metric, MetricSet}
+
+class JvmMetricsSet(name: String) extends MetricSet {
+
+ override def getMetrics: util.Map[String, Metric] = {
+ val memoryMetrics = new MemoryUsageGaugeSet().getMetrics.asScala
+ val threadMetrics = new ThreadStatesGaugeSet().getMetrics.asScala
+ Map(
+ s"$name:memory.total.used" -> memoryMetrics("total.used"),
+ s"$name:memory.total.committed" -> memoryMetrics("total.committed"),
+ s"$name:memory.total.max" -> memoryMetrics("total.max"),
+ s"$name:memory.heap.used" -> memoryMetrics("heap.used"),
+ s"$name:memory.heap.committed" -> memoryMetrics("heap.committed"),
+ s"$name:memory.heap.max" -> memoryMetrics("heap.max"),
+ s"$name:thread.count" -> threadMetrics("count"),
+ s"$name:thread.daemon.count" -> threadMetrics("daemon.count")
+ ).asJava
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
new file mode 100644
index 0000000..6d89456
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
+
+/** See org.apache.gearpump.codahale.metrics.Meter */
+class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
+ private var sampleCount = 0L
+ private var toBeMarked = 0L
+
+ def mark() {
+ meter.mark(1)
+ }
+
+ def mark(n: Long) {
+ toBeMarked += n
+ sampleCount += 1
+ if (null != meter && sampleCount % sampleRate == 0) {
+ meter.mark(toBeMarked)
+ toBeMarked = 0
+ }
+ }
+
+ def getOneMinuteRate(): Double = {
+ meter.getOneMinuteRate
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
new file mode 100644
index 0000000..54d8c6a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import scala.collection.JavaConverters._
+
+import akka.actor._
+import org.slf4j.Logger
+
+import io.gearpump.codahale.metrics._
+import org.apache.gearpump.metrics
+import org.apache.gearpump.util.LogUtil
+
+/** Metric objects registry */
+class Metrics(sampleRate: Int) extends Extension {
+
+ val registry = new MetricRegistry()
+
+ def meter(name: String): metrics.Meter = {
+ new metrics.Meter(name, registry.meter(name), sampleRate)
+ }
+
+ def histogram(name: String): Histogram = {
+ new Histogram(name, registry.histogram(name), sampleRate)
+ }
+
+ def histogram(name: String, sampleRate: Int): Histogram = {
+ new Histogram(name, registry.histogram(name), sampleRate)
+ }
+
+ def counter(name: String): Counter = {
+ new Counter(name, registry.counter(name), sampleRate)
+ }
+
+ def register(set: MetricSet): Unit = {
+ val names = registry.getNames
+ val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) }
+ metrics.foreach { kv =>
+ registry.register(kv._1, kv._2)
+ }
+ }
+}
+
+object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
+
+ val LOG: Logger = LogUtil.getLogger(getClass)
+ import org.apache.gearpump.util.Constants._
+
+ sealed trait MetricType {
+ def name: String
+ }
+
+ object MetricType {
+ def unapply(obj: MetricType): Option[(Histogram, Counter, Meter, Timer, Gauge)] = {
+ obj match {
+ case x: Histogram => Some((x, null, null, null, null))
+ case x: Counter => Some((null, x, null, null, null))
+ case x: Meter => Some((null, null, x, null, null))
+ case x: Timer => Some((null, null, null, x, null))
+ case g: Gauge => Some((null, null, null, null, g))
+ }
+ }
+
+ def apply(h: Histogram, c: Counter, m: Meter, t: Timer, g: Gauge): MetricType = {
+ val result =
+ if (h != null) h
+ else if (c != null) c
+ else if (m != null) m
+ else if (t != null) t
+ else if (g != null) g
+ else null
+ result
+ }
+ }
+
+ case class Histogram (
+ name: String, mean: Double,
+ stddev: Double, median: Double,
+ p95: Double, p99: Double, p999: Double)
+ extends MetricType
+
+ case class Counter(name: String, value: Long) extends MetricType
+
+ case class Meter(
+ name: String, count: Long, meanRate: Double,
+ m1: Double, rateUnit: String)
+ extends MetricType
+
+ case class Timer(
+ name: String, count: Long, min: Double, max: Double,
+ mean: Double, stddev: Double, median: Double,
+ p75: Double, p95: Double, p98: Double,
+ p99: Double, p999: Double, meanRate: Double,
+ m1: Double, m5: Double, m15: Double,
+ rateUnit: String, durationUnit: String)
+ extends MetricType
+
+ case class Gauge(name: String, value: Long) extends MetricType
+
+ case object ReportMetrics
+
+ case class DemandMoreMetrics(subscriber: ActorRef)
+
+ override def get(system: ActorSystem): Metrics = super.get(system)
+
+ override def lookup: ExtensionId[Metrics] = Metrics
+
+ override def createExtension(system: ExtendedActorSystem): Metrics = {
+ val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED)
+ LOG.info(s"Metrics is enabled..., $metricsEnabled")
+ val sampleRate = system.settings.config.getInt(GEARPUMP_METRIC_SAMPLE_RATE)
+ if (metricsEnabled) {
+ val meters = new Metrics(sampleRate)
+ meters
+ } else {
+ new DummyMetrics
+ }
+ }
+
+ class DummyMetrics extends Metrics(1) {
+ override def register(set: MetricSet): Unit = Unit
+
+ private val meter = new metrics.Meter("", null) {
+ override def mark(): Unit = Unit
+ override def mark(n: Long): Unit = Unit
+ override def getOneMinuteRate(): Double = 0
+ }
+
+ private val histogram = new metrics.Histogram("", null) {
+ override def update(value: Long): Unit = Unit
+ override def getMean(): Double = 0
+ override def getStdDev(): Double = 0
+ }
+
+ private val counter = new metrics.Counter("", null) {
+ override def inc(): Unit = Unit
+ override def inc(n: Long): Unit = Unit
+ }
+
+ override def meter(name: String): metrics.Meter = meter
+ override def histogram(name: String): metrics.Histogram = histogram
+ override def counter(name: String): metrics.Counter = counter
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala
new file mode 100644
index 0000000..08dad57
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
+
+/**
+ * Aggregates a larger set of metrics into a smaller set
+ *
+ * Sub Class must implement a constructor with signature like this:
+ * MetricsAggregator(config: Config)
+ */
+trait MetricsAggregator {
+ def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
+ : List[HistoryMetricsItem]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
new file mode 100644
index 0000000..9b506af
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import java.net.InetSocketAddress
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
+
+import akka.actor.{Actor, ActorRef}
+
+import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
+import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
+import org.apache.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
+import org.apache.gearpump.metrics.MetricsReporterService.ReportTo
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files...
+ *
+ * @param metrics Holds a list of metrics object.
+ */
+class MetricsReporterService(metrics: Metrics) extends Actor {
+
+ private val LOG = LogUtil.getLogger(getClass)
+ private implicit val system = context.system
+
+ private val reportInterval = system.settings.config.getInt(GEARPUMP_METRIC_REPORT_INTERVAL)
+ private val reporter = getReporter
+ implicit val dispatcher = context.dispatcher
+
+ def receive: Receive = {
+ // The subscriber is demanding more messages.
+ case DemandMoreMetrics(subscriber) => {
+ reporter.report(subscriber)
+ context.system.scheduler.scheduleOnce(reportInterval.milliseconds,
+ subscriber, ReportMetrics)
+ }
+ }
+
+ def startGraphiteReporter(): ReportTo = {
+ val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST)
+ val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT)
+
+ val graphite = new Graphite(new InetSocketAddress(graphiteHost, graphitePort))
+ LOG.info(s"reporting to $graphiteHost, $graphitePort")
+ new ReportTo {
+ private val reporter = GraphiteReporter.forRegistry(metrics.registry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .filter(MetricFilter.ALL)
+ .build(graphite)
+
+ override def report(to: ActorRef): Unit = reporter.report()
+ }
+ }
+
+ def startSlf4jReporter(): ReportTo = {
+ new ReportTo {
+ val reporter = Slf4jReporter.forRegistry(metrics.registry)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .filter(MetricFilter.ALL)
+ .outputTo(LOG)
+ .build()
+
+ override def report(to: ActorRef): Unit = reporter.report()
+ }
+ }
+
+ def startAkkaReporter(): ReportTo = {
+ new AkkaReporter(system, metrics.registry)
+ }
+
+ def getReporter: ReportTo = {
+ val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
+ LOG.info(s"Metrics reporter is enabled, using $reporterType reporter")
+ val reporter = reporterType match {
+ case "graphite" => startGraphiteReporter()
+ case "logfile" => startSlf4jReporter()
+ case "akka" => startAkkaReporter()
+ }
+ reporter
+ }
+}
+
+object MetricsReporterService {
+
+ /** Target where user want to report the metrics data to */
+ trait ReportTo {
+ def report(to: ActorRef): Unit
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
new file mode 100644
index 0000000..b1118d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache
+
+package object gearpump {
+ type TimeStamp = Long
+ val LatestTime = -1
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
new file mode 100644
index 0000000..99cbcb6
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.apache.gearpump.Message
+
+/** Used by storm module to broadcast message to all downstream tasks */
+class BroadcastPartitioner extends MulticastPartitioner {
+ private var lastPartitionNum = -1
+ private var partitions = Array.empty[Int]
+
+ override def getPartitions(
+ msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+ if (partitionNum != lastPartitionNum) {
+ partitions = (0 until partitionNum).toArray
+ lastPartitionNum = partitionNum
+ }
+ partitions
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
new file mode 100644
index 0000000..5a3eec4
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Will have the same parallelism with last processor
+ * And each task in current processor will co-locate with task of last processor
+ */
+class CoLocationPartitioner extends UnicastPartitioner {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ currentPartitionId
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
new file mode 100644
index 0000000..ee684a9
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Only make sense when the message has implemented the hashCode()
+ * Otherwise, it will use Object.hashCode(), which will not return
+ * same hash code after serialization and deserialization.
+ */
+class HashPartitioner extends UnicastPartitioner {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
new file mode 100644
index 0000000..d68fa65
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import scala.reflect.ClassTag
+
+import org.apache.commons.lang.SerializationUtils
+
+import org.apache.gearpump.Message
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
+sealed trait Partitioner extends Serializable
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
+trait UnicastPartitioner extends Partitioner {
+
+ /**
+ * Gets the SINGLE downstream processor task index to send message to.
+ *
+ * @param msg Message you want to send
+ * @param partitionNum How many tasks does the downstream processor have.
+ * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
+ *
+ * @return ONE task index of downstream processor.
+ */
+ def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
+
+ def getPartition(msg: Message, partitionNum: Int): Int = {
+ getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+ }
+}
+
+trait MulticastPartitioner extends Partitioner {
+
+ /**
+ * Gets a list of downstream processor task indexes to send message to.
+ *
+ * @param upstreamTaskIndex Current sender task's task index.
+ *
+ */
+ def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
+
+ def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
+ getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+ }
+}
+
+sealed trait PartitionerFactory {
+
+ def name: String
+
+ def partitioner: Partitioner
+}
+
+/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+ extends PartitionerFactory with Serializable {
+
+ override def name: String = partitioner.getClass.getName
+
+ override def partitioner: Partitioner = {
+ SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+ }
+}
+
+/** Store the partitioner in class Name, the user need to instantiate a new class */
+class PartitionerByClassName(partitionerClass: String)
+ extends PartitionerFactory with Serializable {
+
+ override def name: String = partitionerClass
+ override def partitioner: Partitioner = {
+ Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+ }
+}
+
+/**
+ * @param partitionerFactory How we construct a Partitioner.
+ */
+case class PartitionerDescription(partitionerFactory: PartitionerFactory)
+
+object Partitioner {
+ val UNKNOWN_PARTITION_ID = -1
+
+ def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
+ PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
new file mode 100644
index 0000000..55ef614
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import scala.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * The idea of ShuffleGroupingPartitioner is derived from Storm.
+ * Messages are randomly distributed across the downstream's tasks in a way such that
+ * each task is guaranteed to get an equal number of messages.
+ */
+class ShuffleGroupingPartitioner extends UnicastPartitioner {
+ private val random = new Random
+ private var index = -1
+ private var partitions = List.empty[Int]
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ index += 1
+ if (partitions.isEmpty) {
+ partitions = 0.until(partitionNum).toList
+ partitions = random.shuffle(partitions)
+ } else if (index >= partitionNum) {
+ index = 0
+ partitions = random.shuffle(partitions)
+ }
+ partitions(index)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
new file mode 100644
index 0000000..5c66d66
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * Round Robin partition the data to downstream processor tasks.
+ */
+class ShufflePartitioner extends UnicastPartitioner {
+ private var seed = 0
+ private var count = 0
+
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+
+ if (seed == 0) {
+ seed = newSeed()
+ }
+
+ val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
+ count = count + 1
+ result
+ }
+
+ private def newSeed(): Int = new Random().nextInt()
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala b/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala
new file mode 100644
index 0000000..497506b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.gearpump.security.Authenticator.AuthenticationResult
+
+/**
+ * Authenticator for UI dashboard.
+ *
+ * Sub Class must implement a constructor with signature like this:
+ * this(config: Config)
+ */
+trait Authenticator {
+
+ // TODO: Change the signature to return more attributes of user credentials...
+ def authenticate(
+ user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
+}
+
+object Authenticator {
+
+ trait AuthenticationResult {
+
+ def authenticated: Boolean
+
+ def permissionLevel: Int
+ }
+
+ val UnAuthenticated = new AuthenticationResult {
+ override val authenticated = false
+ override val permissionLevel = -1
+ }
+
+ /** Guest can view but have no permission to submit app or write */
+ val Guest = new AuthenticationResult {
+ override val authenticated = true
+ override val permissionLevel = 1000
+ }
+
+ /** User can submit app, kill app, but have no permission to add or remote machines */
+ val User = new AuthenticationResult {
+ override val authenticated = true
+ override val permissionLevel = 1000 + Guest.permissionLevel
+ }
+
+ /** Super user */
+ val Admin = new AuthenticationResult {
+ override val authenticated = true
+ override val permissionLevel = 1000 + User.permissionLevel
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala
new file mode 100644
index 0000000..110fd8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.security.Authenticator.AuthenticationResult
+import org.apache.gearpump.security.ConfigFileBasedAuthenticator._
+
+object ConfigFileBasedAuthenticator {
+
+ private val ROOT = "gearpump.ui-security.config-file-based-authenticator"
+ private val ADMINS = ROOT + "." + "admins"
+ private val USERS = ROOT + "." + "users"
+ private val GUESTS = ROOT + "." + "guests"
+
+ private case class Credentials(
+ admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
+
+ def verify(user: String, password: String): AuthenticationResult = {
+ if (admins.contains(user)) {
+ if (verify(user, password, admins)) {
+ Authenticator.Admin
+ } else {
+ Authenticator.UnAuthenticated
+ }
+ } else if (users.contains(user)) {
+ if (verify(user, password, users)) {
+ Authenticator.User
+ } else {
+ Authenticator.UnAuthenticated
+ }
+ } else if (guests.contains(user)) {
+ if (verify(user, password, guests)) {
+ Authenticator.Guest
+ } else {
+ Authenticator.UnAuthenticated
+ }
+ } else {
+ Authenticator.UnAuthenticated
+ }
+ }
+
+ private def verify(user: String, password: String, map: Map[String, String]): Boolean = {
+ val storedPass = map(user)
+ PasswordUtil.verify(password, storedPass)
+ }
+ }
+}
+
+/**
+ * UI dashboard authenticator based on configuration file.
+ *
+ * It has three categories of users: admins, users, and guests.
+ * admins have unlimited permission, like shutdown a cluster, add/remove machines.
+ * users have limited permission to submit an application and etc..
+ * guests can not submit/kill applications, but can view the application status.
+ *
+ * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find
+ * information about how to configure this authenticator.
+ *
+ * [Security consideration]
+ * It will keep one-way sha1 digest of password instead of password itself. The original password is
+ * NOT kept in any way, so generally it is safe.
+ *
+ *
+ * digesting flow (from original password to digest):
+ * {{{
+ * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) ->
+ * base64Encode.
+ * }}}
+ *
+ * Verification user input password with stored digest:
+ * {{{
+ * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest:
+ * salt + sha1 -> compare the generated digest with the stored digest.
+ * }}}
+ */
+class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
+
+ private val credentials = loadCredentials(config)
+
+ override def authenticate(user: String, password: String, ec: ExecutionContext)
+ : Future[AuthenticationResult] = {
+ implicit val ctx = ec
+ Future {
+ credentials.verify(user, password)
+ }
+ }
+
+ private def loadCredentials(config: Config): Credentials = {
+ val admins = configToMap(config, ADMINS)
+ val users = configToMap(config, USERS)
+ val guests = configToMap(config, GUESTS)
+ new Credentials(admins, users, guests)
+ }
+
+ private def configToMap(config: Config, path: String) = {
+ import scala.collection.JavaConverters._
+ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala b/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala
new file mode 100644
index 0000000..25b68c6
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+
+import java.security.MessageDigest
+import scala.util.Try
+
+import sun.misc.{BASE64Decoder, BASE64Encoder}
+
+/**
+ * Util to verify whether user input password is valid or not.
+ * It use sha1 to do the digesting.
+ */
+object PasswordUtil {
+ private val SALT_LENGTH = 8
+
+ /**
+ * Verifies user input password with stored digest:
+ * {{{
+ * base64Decode -> extract salt -> do sha1(salt, password) ->
+ * generate digest: salt + sha1 -> compare the generated digest with the stored digest.
+ * }}}
+ */
+ def verify(password: String, stored: String): Boolean = {
+ Try {
+ val decoded = base64Decode(stored)
+ val salt = new Array[Byte](SALT_LENGTH)
+ Array.copy(decoded, 0, salt, 0, SALT_LENGTH)
+
+ hash(password, salt) == stored
+ }.getOrElse(false)
+ }
+ /**
+ * digesting flow (from original password to digest):
+ * {{{
+ * random salt byte array of length 8 ->
+ * byte array of (salt + sha1(salt, password)) -> base64Encode
+ * }}}
+ */
+ def hash(password: String): String = {
+ // Salt generation 64 bits long
+ val salt = new Array[Byte](SALT_LENGTH)
+ new java.util.Random().nextBytes(salt)
+ hash(password, salt)
+ }
+
+ private def hash(password: String, salt: Array[Byte]): String = {
+ val digest = MessageDigest.getInstance("SHA-1")
+ digest.reset()
+ digest.update(salt)
+ var input = digest.digest(password.getBytes("UTF-8"))
+ digest.reset()
+ input = digest.digest(input)
+ val withSalt = salt ++ input
+ base64Encode(withSalt)
+ }
+
+ private def base64Encode(data: Array[Byte]): String = {
+ val endecoder = new BASE64Encoder()
+ endecoder.encode(data)
+ }
+
+ private def base64Decode(data: String): Array[Byte] = {
+ val decoder = new BASE64Decoder()
+ decoder.decodeBuffer(data)
+ }
+
+ // scalastyle:off println
+ private def help() = {
+ Console.println("usage: gear org.apache.gearpump.security.PasswordUtil -password " +
+ "<your password>")
+ }
+
+ def main(args: Array[String]): Unit = {
+ if (args.length != 2 || args(0) != "-password") {
+ help()
+ } else {
+ val pass = args(1)
+ val result = hash(pass)
+ Console.println("Here is the hashed password")
+ Console.println("==============================")
+ Console.println(result)
+ }
+ }
+ // scalastyle:on println
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala
new file mode 100644
index 0000000..6295faf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import akka.actor.ExtendedActorSystem
+
+import org.apache.gearpump.cluster.UserConfig
+
+/**
+ * A build-in serializer framework using kryo
+ *
+ * NOTE: The Kryo here is a shaded version by Gearpump
+ */
+class FastKryoSerializationFramework extends SerializationFramework {
+ private var system: ExtendedActorSystem = null
+
+ private lazy val pool = new ThreadLocal[Serializer]() {
+ override def initialValue(): Serializer = {
+ new FastKryoSerializer(system)
+ }
+ }
+
+ override def init(system: ExtendedActorSystem, config: UserConfig): Unit = {
+ this.system = system
+ }
+
+ override def get(): Serializer = {
+ pool.get()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
new file mode 100644
index 0000000..ed1e347
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import akka.actor.ExtendedActorSystem
+
+import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
+import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
+import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper
+import org.apache.gearpump.serializer.FastKryoSerializer.KryoSerializationException
+import org.apache.gearpump.util.LogUtil
+
+class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
+
+ private val LOG = LogUtil.getLogger(getClass)
+ private val config = system.settings.config
+
+ private val kryoSerializer = new KryoSerializerWrapper(system)
+ private val kryo = kryoSerializer.kryo
+ val strategy = new DefaultInstantiatorStrategy
+ strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy)
+ kryo.setInstantiatorStrategy(strategy)
+ private val kryoClazz = new GearpumpSerialization(config).customize(kryo)
+
+ override def serialize(message: Any): Array[Byte] = {
+ try {
+ kryoSerializer.toBinary(message)
+ } catch {
+ case ex: java.lang.IllegalArgumentException =>
+ val clazz = message.getClass
+ val error = s"""
+ | ${ex.getMessage}
+ |You can also register the class by providing a configuration with serializer
+ |defined,
+ |
+ |gearpump{
+ | serializers {
+ | ## Follow this format when adding new serializer for new message types
+ | # "yourpackage.YourClass" = "yourpackage.YourSerializerForThisClass"
+ |
+ | ## If you intend to use default serializer for this class, then you can write this
+ | # "yourpackage.YourClass" = ""
+ | }
+ |}
+ |
+ |If you want to register the serializer globally, you need to change
+ |gear.conf on every worker in the cluster; if you only want to register
+ |the serializer for a single streaming application, you need to create
+ |a file under conf/ named application.conf, and add the above configuration
+ |into application.conf. To verify whether the configuration is effective,
+ |you can browser your UI http://{UI Server Host}:8090/api/v1.0/app/{appId}/config,
+ |and check whether your custom serializer is added.
+ """.stripMargin
+
+ LOG.error(error, ex)
+ throw new KryoSerializationException(error, ex)
+ }
+ }
+
+ override def deserialize(msg: Array[Byte]): Any = {
+ kryoSerializer.fromBinary(msg)
+ }
+}
+
+object FastKryoSerializer {
+ class KryoSerializationException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
new file mode 100644
index 0000000..f9c6299
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+class GearpumpSerialization(config: Config) {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ def customize(kryo: Kryo): Unit = {
+
+ val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS)
+
+ serializationMap.foreach { kv =>
+ val (key, value) = kv
+ val keyClass = Class.forName(key)
+
+ if (value == null || value.isEmpty) {
+
+ // Use default serializer for this class type
+ kryo.register(keyClass)
+ } else {
+ val valueClass = Class.forName(value)
+ val register = kryo.register(keyClass,
+ valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
+ LOG.debug(s"Registering ${keyClass}, id: ${register.getId}")
+ }
+ }
+ kryo.setReferences(false)
+
+ // Requires the user to register the class first before using
+ kryo.setRegistrationRequired(true)
+ }
+
+ private final def configToMap(config: Config, path: String) = {
+ import scala.collection.JavaConverters._
+ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala
new file mode 100644
index 0000000..995ba1f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import akka.actor.ExtendedActorSystem
+
+import org.apache.gearpump.cluster.UserConfig
+
+/**
+ * User are allowed to use a customized serialization framework by extending this
+ * interface.
+ */
+trait SerializationFramework {
+ def init(system: ExtendedActorSystem, config: UserConfig)
+
+ /**
+ * Need to be thread safe
+ *
+ * Get a serializer to use.
+ * Note: this method can be called in a multi-thread environment. It's the
+ * responsibility of SerializationFramework Developer to assure this method
+ * is thread safe.
+ *
+ * To be thread-safe, one recommendation would be using a thread local pool
+ * to maintain reference to Serializer of same thread.
+ */
+ def get(): Serializer
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala
new file mode 100644
index 0000000..7c0f4bf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+/**
+ * User defined message serializer
+ */
+trait Serializer {
+ def serialize(message: Any): Array[Byte]
+
+ def deserialize(msg: Array[Byte]): Any
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/Express.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/Express.scala b/core/src/main/scala/org/apache/gearpump/transport/Express.scala
new file mode 100644
index 0000000..ef1e8d9
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/Express.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport
+
+import scala.collection.immutable.LongMap
+import scala.concurrent._
+
+import akka.actor._
+import akka.agent.Agent
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.netty.Client.Close
+import org.apache.gearpump.transport.netty.{TaskMessage, Context}
+import org.apache.gearpump.util.LogUtil
+
+trait ActorLookupById {
+
+ /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */
+ def lookupLocalActor(id: Long): Option[ActorRef]
+}
+
+/**
+ * Custom networking layer.
+ *
+ * It will translate long sender/receiver address to shorter ones to reduce
+ * the network overhead.
+ */
+class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById {
+
+ import system.dispatcher
+
+ import org.apache.gearpump.transport.Express._
+ val localActorMap = Agent(LongMap.empty[ActorRef])
+ val remoteAddressMap = Agent(Map.empty[Long, HostPort])
+
+ val remoteClientMap = Agent(Map.empty[HostPort, ActorRef])
+
+ val conf = system.settings.config
+
+ lazy val (context, serverPort, localHost) = init
+
+ lazy val init = {
+ LOG.info(s"Start Express init ...${system.name}")
+ val context = new Context(system, conf)
+ val serverPort = context.bind("netty-server", this)
+ val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort)
+ LOG.info(s"binding to netty server $localHost")
+
+ system.registerOnTermination(new Runnable {
+ override def run(): Unit = context.close()
+ })
+ (context, serverPort, localHost)
+ }
+
+ def unregisterLocalActor(id: Long): Unit = {
+ localActorMap.sendOff(_ - id)
+ }
+
+ /** Start Netty client actors to connect to remote machines */
+ def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
+ val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet
+ closeClients(clientsToClose)
+ hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) =>
+ remoteClientMap.alter { map =>
+ if (!map.contains(hostPort)) {
+ val actor = context.connect(hostPort)
+ map + (hostPort -> actor)
+ } else {
+ map
+ }
+ }
+ }
+ }
+
+ def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
+ remoteClientMap.alter { map =>
+ map.filterKeys(hostPorts.contains).foreach { hostAndClient =>
+ val (_, client) = hostAndClient
+ client ! Close
+ }
+ map -- hostPorts
+ }
+ }
+
+ def registerLocalActor(id: Long, actor: ActorRef): Unit = {
+ LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}")
+ init
+ localActorMap.sendOff(_ + (id -> actor))
+ }
+
+ def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id)
+
+ def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id)
+
+ /** Send message to remote task */
+ def transport(taskMessage: TaskMessage, remote: HostPort): Unit = {
+
+ val remoteClient = remoteClientMap.get.get(remote)
+ if (remoteClient.isDefined) {
+ remoteClient.get.tell(taskMessage, Actor.noSender)
+ } else {
+ val errorMsg = s"Clients has not been launched properly before transporting messages, " +
+ s"the destination is $remote"
+ LOG.error(errorMsg)
+ throw new Exception(errorMsg)
+ }
+ }
+}
+
+/** A customized transport layer by using Akka extension */
+object Express extends ExtensionId[Express] with ExtensionIdProvider {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override def get(system: ActorSystem): Express = super.get(system)
+
+ override def lookup: ExtensionId[Express] = Express
+
+ override def createExtension(system: ExtendedActorSystem): Express = new Express(system)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala b/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala
new file mode 100644
index 0000000..d6dcd08
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport
+
+case class HostPort(host: String, port: Int) {
+ def toTuple: (String, Int) = {
+ (host, port)
+ }
+}
+
+object HostPort {
+ def apply(address: String): HostPort = {
+ val hostAndPort = address.split(":")
+ new HostPort(hostAndPort(0), hostAndPort(1).toInt)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala
new file mode 100644
index 0000000..506e11c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.net.{ConnectException, InetSocketAddress}
+import java.nio.channels.ClosedChannelException
+import java.util
+import java.util.Random
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
+
+import akka.actor.Actor
+import org.jboss.netty.bootstrap.ClientBootstrap
+import org.jboss.netty.channel._
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Netty Client implemented as an actor, on the other side, there is a netty server Actor.
+ * All messages sent to this actor will be forwarded to remote machine.
+ */
+class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor {
+ import org.apache.gearpump.transport.netty.Client._
+
+ val name = s"netty-client-$hostPort"
+
+ private final var bootstrap: ClientBootstrap = null
+ private final val random: Random = new Random
+ private val serializer = conf.newTransportSerializer
+ private var channel: Channel = null
+
+ var batch = new util.ArrayList[TaskMessage]
+
+ private val init = {
+ bootstrap = NettyUtil.createClientBootStrap(factory,
+ new ClientPipelineFactory(name, conf), conf.buffer_size)
+ self ! Connect(0)
+ }
+
+ def receive: Receive = messageHandler orElse connectionHandler
+
+ def messageHandler: Receive = {
+ case msg: TaskMessage =>
+ batch.add(msg)
+ case flush@Flush(flushChannel) =>
+ if (channel != flushChannel) {
+ Unit // Drop, as it belong to old channel flush message
+ } else if (batch.size > 0 && flushChannel.isWritable) {
+ send(flushChannel, batch.iterator)
+ batch.clear()
+ self ! flush
+ } else {
+ import context.dispatcher
+ context.system.scheduler.scheduleOnce(
+ new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
+ }
+ }
+
+ def connectionHandler: Receive = {
+ case ChannelReady(channel) =>
+ this.channel = channel
+ self ! Flush(channel)
+ case Connect(tries) =>
+ if (null == channel) {
+ connect(tries)
+ } else {
+ LOG.error("there already exist a channel, will not establish a new one...")
+ }
+ case CompareAndReconnectIfEqual(oldChannel) =>
+ if (oldChannel == channel) {
+ channel = null
+ self ! Connect(0)
+ }
+ case Close =>
+ close()
+ context.become(closed)
+ }
+
+ def closed: Receive = {
+ case msg: AnyRef =>
+ LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...")
+ }
+
+ private def connect(tries: Int): Unit = {
+ LOG.info(s"netty client try to connect to $name, tries: $tries")
+ if (tries <= conf.max_retries) {
+ val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port)
+ val future = bootstrap.connect(remote_addr)
+ future success { current =>
+ LOG.info(s"netty client successfully connectted to $name, tries: $tries")
+ self ! ChannelReady(current)
+ } fail { (current, ex) =>
+ LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}")
+ current.close()
+ import context.dispatcher
+ context.system.scheduler.scheduleOnce(
+ new FiniteDuration(
+ getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
+ }
+ } else {
+ LOG.error(s"fail to connect to a remote host $name after retied $tries ...")
+ self ! Close
+ }
+ }
+
+ private def send(flushChannel: Channel, msgs: util.Iterator[TaskMessage]) {
+ var messageBatch: MessageBatch = null
+
+ while (msgs.hasNext) {
+ val message: TaskMessage = msgs.next()
+ if (null == messageBatch) {
+ messageBatch = new MessageBatch(conf.messageBatchSize, serializer)
+ }
+ messageBatch.add(message)
+ if (messageBatch.isFull) {
+ val toBeFlushed: MessageBatch = messageBatch
+ flushRequest(flushChannel, toBeFlushed)
+ messageBatch = null
+ }
+ }
+ if (null != messageBatch && !messageBatch.isEmpty) {
+ flushRequest(flushChannel, messageBatch)
+ }
+ }
+
+ private def close() {
+ LOG.info(s"closing netty client $name...")
+ if (null != channel) {
+ channel.close()
+ channel = null
+ }
+ batch = null
+ }
+
+ override def postStop(): Unit = {
+ close()
+ }
+
+ private def flushRequest(channel: Channel, requests: MessageBatch) {
+ val future: ChannelFuture = channel.write(requests)
+ future.fail { (channel, ex) =>
+ if (channel.isOpen) {
+ channel.close
+ }
+ LOG.error(s"failed to send requests " +
+ s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
+ if (!ex.isInstanceOf[ClosedChannelException]) {
+ LOG.error(ex.getMessage, ex)
+ }
+ self ! CompareAndReconnectIfEqual(channel)
+ }
+ }
+
+ private def getSleepTimeMs(retries: Int): Long = {
+ if (retries > 30) {
+ conf.max_sleep_ms
+ } else {
+ val backoff = 1 << retries
+ val sleepMs = conf.base_sleep_ms * Math.max(1, random.nextInt(backoff))
+ if (sleepMs < conf.max_sleep_ms) sleepMs else conf.max_sleep_ms
+ }
+ }
+
+ private def isChannelWritable = (null != channel) && channel.isWritable
+}
+
+object Client {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+
+ // Reconnect if current channel equals channel
+ case class CompareAndReconnectIfEqual(channel: Channel)
+
+ case class Connect(tries: Int)
+ case class ChannelReady(chanel: Channel)
+ case object Close
+
+ case class Flush(channel: Channel)
+
+ class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler {
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) {
+ event.getCause match {
+ case ex: ConnectException => Unit
+ case ex: ClosedChannelException =>
+ LOG.warn("exception found when trying to close netty connection", ex.getMessage)
+ case ex => LOG.error("Connection failed " + name, ex)
+ }
+ }
+ }
+
+ class ClientPipelineFactory(name: String, conf: NettyConfig) extends ChannelPipelineFactory {
+ def getPipeline: ChannelPipeline = {
+ val pipeline: ChannelPipeline = Channels.pipeline
+ pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
+ pipeline.addLast("encoder", new MessageEncoder)
+ pipeline.addLast("handler", new ClientErrorHandler(name))
+ pipeline
+ }
+ }
+
+ implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = {
+ new ChannelFutureOps(channel)
+ }
+
+ class ChannelFutureOps(channelFuture: ChannelFuture) {
+
+ def success(handler: (Channel => Unit)): ChannelFuture = {
+ channelFuture.addListener(new ChannelFutureListener {
+ def operationComplete(future: ChannelFuture) {
+ if (future.isSuccess) {
+ handler(future.getChannel)
+ }
+ }
+ })
+ channelFuture
+ }
+
+ def fail(handler: ((Channel, Throwable) => Unit)): ChannelFuture = {
+ channelFuture.addListener(new ChannelFutureListener {
+ def operationComplete(future: ChannelFuture) {
+ if (!future.isSuccess) {
+ handler(future.getChannel, future.getCause)
+ }
+ }
+ })
+ channelFuture
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala
new file mode 100644
index 0000000..bc19960
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.io.Closeable
+import java.util.concurrent._
+
+import scala.collection.JavaConverters._
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.Config
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.netty.Server.ServerPipelineFactory
+import org.apache.gearpump.transport.{ActorLookupById, HostPort}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+object Context {
+ private final val LOG: Logger = LogUtil.getLogger(getClass)
+}
+
+/** Netty Context */
+class Context(system: ActorSystem, conf: NettyConfig) extends IContext {
+ import org.apache.gearpump.transport.netty.Context._
+
+ def this(system: ActorSystem, conf: Config) {
+ this(system, new NettyConfig(conf))
+ }
+
+ private val closeHandler = new ConcurrentLinkedQueue[Closeable]()
+ private val nettyDispatcher = system.settings.config.getString(Constants.NETTY_DISPATCHER)
+ val maxWorkers: Int = 1
+
+ private lazy val clientChannelFactory: NioClientSocketChannelFactory = {
+ val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss")
+ val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker")
+ val channelFactory =
+ new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory), maxWorkers)
+
+ closeHandler.add(new Closeable {
+ override def close(): Unit = {
+ LOG.info("Closing all client resources....")
+ channelFactory.releaseExternalResources
+ }
+ })
+ channelFactory
+ }
+
+ def bind(
+ name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true,
+ inputPort: Int = 0): Int = {
+ // TODO: whether we should expose it as application config?
+ val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor,
+ deserializeFlag).withDispatcher(nettyDispatcher), name)
+ val (port, channel) = NettyUtil.newNettyServer(name,
+ new ServerPipelineFactory(server, conf), 5242880, inputPort)
+ val factory = channel.getFactory
+ closeHandler.add(new Closeable {
+ override def close(): Unit = {
+ system.stop(server)
+ channel.close()
+ LOG.info("Closing all server resources....")
+ factory.releaseExternalResources
+ }
+ })
+ port
+ }
+
+ def connect(hostPort: HostPort): ActorRef = {
+ val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort)
+ .withDispatcher(nettyDispatcher))
+ closeHandler.add(new Closeable {
+ override def close(): Unit = {
+ LOG.info("closing Client actor....")
+ system.stop(client)
+ }
+ })
+
+ client
+ }
+
+ /**
+ * terminate this context
+ */
+ def close(): Unit = {
+
+ LOG.info(s"Context.term, cleanup resources...., " +
+ s"we have ${closeHandler.size()} items to close...")
+
+ // Cleans up resource in reverse order so that client actor can be cleaned
+ // before clientChannelFactory
+ closeHandler.iterator().asScala.toList.reverse.foreach(_.close())
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala
new file mode 100644
index 0000000..dae7b3a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.transport.{ActorLookupById, HostPort}
+
+trait IContext {
+
+ /**
+ * Create a Netty server connection.
+ */
+ def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int
+
+ /**
+ * Create a Netty client actor
+ */
+ def connect(hostPort: HostPort): ActorRef
+
+ /**
+ * Close resource for this context
+ */
+ def close()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala
new file mode 100644
index 0000000..03f759e
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.util.Constants
+
+class NettyConfig(conf: Config) {
+
+ val buffer_size = conf.getInt(Constants.NETTY_BUFFER_SIZE)
+ val max_retries = conf.getInt(Constants.NETTY_MAX_RETRIES)
+ val base_sleep_ms = conf.getInt(Constants.NETTY_BASE_SLEEP_MS)
+ val max_sleep_ms = conf.getInt(Constants.NETTY_MAX_SLEEP_MS)
+ val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE)
+ val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL)
+
+ def newTransportSerializer: ITransportMessageSerializer = {
+ Class.forName(
+ conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER))
+ .newInstance().asInstanceOf[ITransportMessageSerializer]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala
new file mode 100644
index 0000000..ddb0afe
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory}
+
+object NettyUtil {
+
+ def newNettyServer(
+ name: String,
+ pipelineFactory: ChannelPipelineFactory,
+ buffer_size: Int,
+ inputPort: Int = 0): (Int, Channel) = {
+ val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss")
+ val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker")
+ val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory), 1)
+
+ val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size)
+ val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort))
+ val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort()
+ (port, channel)
+ }
+
+ def createServerBootStrap(
+ factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
+ : ServerBootstrap = {
+ val bootstrap = new ServerBootstrap(factory)
+ bootstrap.setOption("child.tcpNoDelay", true)
+ bootstrap.setOption("child.receiveBufferSize", buffer_size)
+ bootstrap.setOption("child.keepAlive", true)
+ bootstrap.setPipelineFactory(pipelineFactory)
+ bootstrap
+ }
+
+ def createClientBootStrap(
+ factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
+ : ClientBootstrap = {
+ val bootstrap = new ClientBootstrap(factory)
+ bootstrap.setOption("tcpNoDelay", true)
+ bootstrap.setOption("sendBufferSize", buffer_size)
+ bootstrap.setOption("keepAlive", true)
+ bootstrap.setPipelineFactory(pipelineFactory)
+ bootstrap
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala
new file mode 100644
index 0000000..8d39795
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.IntMap
+import scala.concurrent.Future
+
+import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
+import org.jboss.netty.channel._
+import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.ActorLookupById
+import org.apache.gearpump.util.{AkkaHelper, LogUtil}
+
+/** Netty server actor, message received will be forward to the target on the address line. */
+class Server(
+ name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean)
+ extends Actor {
+
+ private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name)
+ import org.apache.gearpump.transport.netty.Server._
+
+ val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server")
+
+ val system = context.system.asInstanceOf[ExtendedActorSystem]
+
+ def receive: Receive = msgHandler orElse channelManager
+ // As we will only transfer TaskId on the wire,
+ // this object will translate taskId to or from ActorRef
+ private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context)
+
+ def channelManager: Receive = {
+ case AddChannel(channel) => allChannels.add(channel)
+ case CloseChannel(channel) =>
+ import context.dispatcher
+ Future {
+ channel.close.awaitUninterruptibly
+ allChannels.remove(channel)
+ }
+ }
+
+ def msgHandler: Receive = {
+ case MsgBatch(msgs) =>
+ msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch =>
+ val (taskId, taskMessages) = taskBatch
+ val actor = lookupActor.lookupLocalActor(taskId)
+
+ if (actor.isEmpty) {
+ LOG.error(s"Cannot find actor for id: $taskId...")
+ } else taskMessages.foreach { taskMessage =>
+ actor.get.tell(taskMessage.message(),
+ taskIdActorRefTranslation.translateToActorRef(taskMessage.sessionId()))
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ allChannels.close.awaitUninterruptibly
+ }
+}
+
+object Server {
+
+ class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory {
+ def getPipeline: ChannelPipeline = {
+ val pipeline: ChannelPipeline = Channels.pipeline
+ pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
+ pipeline.addLast("encoder", new MessageEncoder)
+ pipeline.addLast("handler", new ServerHandler(server))
+ pipeline
+ }
+ }
+
+ class ServerHandler(server: ActorRef) extends SimpleChannelUpstreamHandler {
+ private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = server.path.name)
+
+ override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
+ server ! AddChannel(e.getChannel)
+ }
+
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ val msgs: util.List[TaskMessage] = e.getMessage.asInstanceOf[util.List[TaskMessage]]
+ if (msgs != null) {
+ server ! MsgBatch(msgs)
+ }
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ LOG.error("server errors in handling the request", e.getCause)
+ server ! CloseChannel(e.getChannel)
+ }
+ }
+
+ class TaskIdActorRefTranslation(context: ActorContext) {
+ private var taskIdtoActorRef = IntMap.empty[ActorRef]
+
+ /** 1-1 mapping from session id to fake ActorRef */
+ def translateToActorRef(sessionId: Int): ActorRef = {
+ if (!taskIdtoActorRef.contains(sessionId)) {
+
+ // A fake ActorRef for performance optimization.
+ val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId")
+ taskIdtoActorRef += sessionId -> actorRef
+ }
+ taskIdtoActorRef.get(sessionId).get
+ }
+ }
+
+ case class AddChannel(channel: Channel)
+
+ case class CloseChannel(channel: Channel)
+
+ case class MsgBatch(messages: java.lang.Iterable[TaskMessage])
+
+}
\ No newline at end of file