You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:53 UTC

[43/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala
new file mode 100644
index 0000000..87db442
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import java.util
+import scala.collection.JavaConverters._
+
+import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet}
+import io.gearpump.codahale.metrics.{Metric, MetricSet}
+
+class JvmMetricsSet(name: String) extends MetricSet {
+
+  override def getMetrics: util.Map[String, Metric] = {
+    val memoryMetrics = new MemoryUsageGaugeSet().getMetrics.asScala
+    val threadMetrics = new ThreadStatesGaugeSet().getMetrics.asScala
+    Map(
+      s"$name:memory.total.used" -> memoryMetrics("total.used"),
+      s"$name:memory.total.committed" -> memoryMetrics("total.committed"),
+      s"$name:memory.total.max" -> memoryMetrics("total.max"),
+      s"$name:memory.heap.used" -> memoryMetrics("heap.used"),
+      s"$name:memory.heap.committed" -> memoryMetrics("heap.committed"),
+      s"$name:memory.heap.max" -> memoryMetrics("heap.max"),
+      s"$name:thread.count" -> threadMetrics("count"),
+      s"$name:thread.daemon.count" -> threadMetrics("daemon.count")
+    ).asJava
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
new file mode 100644
index 0000000..6d89456
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
+
+/** See org.apache.gearpump.codahale.metrics.Meter */
+class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
+  private var sampleCount = 0L
+  private var toBeMarked = 0L
+
+  def mark() {
+    meter.mark(1)
+  }
+
+  def mark(n: Long) {
+    toBeMarked += n
+    sampleCount += 1
+    if (null != meter && sampleCount % sampleRate == 0) {
+      meter.mark(toBeMarked)
+      toBeMarked = 0
+    }
+  }
+
+  def getOneMinuteRate(): Double = {
+    meter.getOneMinuteRate
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
new file mode 100644
index 0000000..54d8c6a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import scala.collection.JavaConverters._
+
+import akka.actor._
+import org.slf4j.Logger
+
+import io.gearpump.codahale.metrics._
+import org.apache.gearpump.metrics
+import org.apache.gearpump.util.LogUtil
+
+/** Metric objects registry */
+class Metrics(sampleRate: Int) extends Extension {
+
+  val registry = new MetricRegistry()
+
+  def meter(name: String): metrics.Meter = {
+    new metrics.Meter(name, registry.meter(name), sampleRate)
+  }
+
+  def histogram(name: String): Histogram = {
+    new Histogram(name, registry.histogram(name), sampleRate)
+  }
+
+  def histogram(name: String, sampleRate: Int): Histogram = {
+    new Histogram(name, registry.histogram(name), sampleRate)
+  }
+
+  def counter(name: String): Counter = {
+    new Counter(name, registry.counter(name), sampleRate)
+  }
+
+  def register(set: MetricSet): Unit = {
+    val names = registry.getNames
+    val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) }
+    metrics.foreach { kv =>
+      registry.register(kv._1, kv._2)
+    }
+  }
+}
+
+object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider {
+
+  val LOG: Logger = LogUtil.getLogger(getClass)
+  import org.apache.gearpump.util.Constants._
+
+  sealed trait MetricType {
+    def name: String
+  }
+
+  object MetricType {
+    def unapply(obj: MetricType): Option[(Histogram, Counter, Meter, Timer, Gauge)] = {
+      obj match {
+        case x: Histogram => Some((x, null, null, null, null))
+        case x: Counter => Some((null, x, null, null, null))
+        case x: Meter => Some((null, null, x, null, null))
+        case x: Timer => Some((null, null, null, x, null))
+        case g: Gauge => Some((null, null, null, null, g))
+      }
+    }
+
+    def apply(h: Histogram, c: Counter, m: Meter, t: Timer, g: Gauge): MetricType = {
+      val result =
+        if (h != null) h
+        else if (c != null) c
+        else if (m != null) m
+        else if (t != null) t
+        else if (g != null) g
+        else null
+      result
+    }
+  }
+
+  case class Histogram (
+      name: String, mean: Double,
+      stddev: Double, median: Double,
+      p95: Double, p99: Double, p999: Double)
+    extends MetricType
+
+  case class Counter(name: String, value: Long) extends MetricType
+
+  case class Meter(
+      name: String, count: Long, meanRate: Double,
+      m1: Double, rateUnit: String)
+    extends MetricType
+
+  case class Timer(
+      name: String, count: Long, min: Double, max: Double,
+      mean: Double, stddev: Double, median: Double,
+      p75: Double, p95: Double, p98: Double,
+      p99: Double, p999: Double, meanRate: Double,
+      m1: Double, m5: Double, m15: Double,
+      rateUnit: String, durationUnit: String)
+    extends MetricType
+
+  case class Gauge(name: String, value: Long) extends MetricType
+
+  case object ReportMetrics
+
+  case class DemandMoreMetrics(subscriber: ActorRef)
+
+  override def get(system: ActorSystem): Metrics = super.get(system)
+
+  override def lookup: ExtensionId[Metrics] = Metrics
+
+  override def createExtension(system: ExtendedActorSystem): Metrics = {
+    val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED)
+    LOG.info(s"Metrics is enabled...,  $metricsEnabled")
+    val sampleRate = system.settings.config.getInt(GEARPUMP_METRIC_SAMPLE_RATE)
+    if (metricsEnabled) {
+      val meters = new Metrics(sampleRate)
+      meters
+    } else {
+      new DummyMetrics
+    }
+  }
+
+  class DummyMetrics extends Metrics(1) {
+    override def register(set: MetricSet): Unit = Unit
+
+    private val meter = new metrics.Meter("", null) {
+      override def mark(): Unit = Unit
+      override def mark(n: Long): Unit = Unit
+      override def getOneMinuteRate(): Double = 0
+    }
+
+    private val histogram = new metrics.Histogram("", null) {
+      override def update(value: Long): Unit = Unit
+      override def getMean(): Double = 0
+      override def getStdDev(): Double = 0
+    }
+
+    private val counter = new metrics.Counter("", null) {
+      override def inc(): Unit = Unit
+      override def inc(n: Long): Unit = Unit
+    }
+
+    override def meter(name: String): metrics.Meter = meter
+    override def histogram(name: String): metrics.Histogram = histogram
+    override def counter(name: String): metrics.Counter = counter
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala
new file mode 100644
index 0000000..08dad57
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsAggregator.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
+
+/**
+ * Aggregates a larger set of metrics into a smaller set
+ *
+ * Sub Class must implement a constructor with signature like this:
+ * MetricsAggregator(config: Config)
+ */
+trait MetricsAggregator {
+  def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem])
+  : List[HistoryMetricsItem]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
new file mode 100644
index 0000000..9b506af
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import java.net.InetSocketAddress
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
+
+import akka.actor.{Actor, ActorRef}
+
+import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
+import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
+import org.apache.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
+import org.apache.gearpump.metrics.MetricsReporterService.ReportTo
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files...
+ *
+ * @param metrics Holds a list of metrics object.
+ */
+class MetricsReporterService(metrics: Metrics) extends Actor {
+
+  private val LOG = LogUtil.getLogger(getClass)
+  private implicit val system = context.system
+
+  private val reportInterval = system.settings.config.getInt(GEARPUMP_METRIC_REPORT_INTERVAL)
+  private val reporter = getReporter
+  implicit val dispatcher = context.dispatcher
+
+  def receive: Receive = {
+    // The subscriber is demanding more messages.
+    case DemandMoreMetrics(subscriber) => {
+      reporter.report(subscriber)
+      context.system.scheduler.scheduleOnce(reportInterval.milliseconds,
+        subscriber, ReportMetrics)
+    }
+  }
+
+  def startGraphiteReporter(): ReportTo = {
+    val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST)
+    val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT)
+
+    val graphite = new Graphite(new InetSocketAddress(graphiteHost, graphitePort))
+    LOG.info(s"reporting to $graphiteHost, $graphitePort")
+    new ReportTo {
+      private val reporter = GraphiteReporter.forRegistry(metrics.registry)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .filter(MetricFilter.ALL)
+        .build(graphite)
+
+      override def report(to: ActorRef): Unit = reporter.report()
+    }
+  }
+
+  def startSlf4jReporter(): ReportTo = {
+    new ReportTo {
+      val reporter = Slf4jReporter.forRegistry(metrics.registry)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .filter(MetricFilter.ALL)
+        .outputTo(LOG)
+        .build()
+
+      override def report(to: ActorRef): Unit = reporter.report()
+    }
+  }
+
+  def startAkkaReporter(): ReportTo = {
+    new AkkaReporter(system, metrics.registry)
+  }
+
+  def getReporter: ReportTo = {
+    val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER)
+    LOG.info(s"Metrics reporter is enabled, using $reporterType reporter")
+    val reporter = reporterType match {
+      case "graphite" => startGraphiteReporter()
+      case "logfile" => startSlf4jReporter()
+      case "akka" => startAkkaReporter()
+    }
+    reporter
+  }
+}
+
+object MetricsReporterService {
+
+  /** Target where user want to report the metrics data to */
+  trait ReportTo {
+    def report(to: ActorRef): Unit
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
new file mode 100644
index 0000000..b1118d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache
+
+package object gearpump {
+  type TimeStamp = Long
+  val LatestTime = -1
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
new file mode 100644
index 0000000..99cbcb6
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.apache.gearpump.Message
+
+/** Used by storm module to broadcast message to all downstream tasks  */
+class BroadcastPartitioner extends MulticastPartitioner {
+  private var lastPartitionNum = -1
+  private var partitions = Array.empty[Int]
+
+  override def getPartitions(
+      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+    if (partitionNum != lastPartitionNum) {
+      partitions = (0 until partitionNum).toArray
+      lastPartitionNum = partitionNum
+    }
+    partitions
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
new file mode 100644
index 0000000..5a3eec4
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Will have the same parallelism with last processor
+ * And each task in current processor will co-locate with task of last processor
+ */
+class CoLocationPartitioner extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    currentPartitionId
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
new file mode 100644
index 0000000..ee684a9
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Only make sense when the message has implemented the hashCode()
+ * Otherwise, it will use Object.hashCode(), which will not return
+ * same hash code after serialization and deserialization.
+ */
+class HashPartitioner extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
new file mode 100644
index 0000000..d68fa65
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import scala.reflect.ClassTag
+
+import org.apache.commons.lang.SerializationUtils
+
+import org.apache.gearpump.Message
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
+sealed trait Partitioner extends Serializable
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
+trait UnicastPartitioner extends Partitioner {
+
+  /**
+   * Gets the SINGLE downstream processor task index to send message to.
+   *
+   * @param msg Message you want to send
+   * @param partitionNum How many tasks does the downstream processor have.
+   * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
+   *
+   * @return ONE task index of downstream processor.
+   */
+  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
+
+  def getPartition(msg: Message, partitionNum: Int): Int = {
+    getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+  }
+}
+
+trait MulticastPartitioner extends Partitioner {
+
+  /**
+   * Gets a list of downstream processor task indexes to send message to.
+   *
+   * @param upstreamTaskIndex Current sender task's task index.
+   *
+   */
+  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
+
+  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
+    getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+  }
+}
+
+sealed trait PartitionerFactory {
+
+  def name: String
+
+  def partitioner: Partitioner
+}
+
+/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+  extends PartitionerFactory with Serializable {
+
+  override def name: String = partitioner.getClass.getName
+
+  override def partitioner: Partitioner = {
+    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+  }
+}
+
+/** Store the partitioner in class Name, the user need to instantiate a new class */
+class PartitionerByClassName(partitionerClass: String)
+  extends PartitionerFactory with Serializable {
+
+  override def name: String = partitionerClass
+  override def partitioner: Partitioner = {
+    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+  }
+}
+
+/**
+ * @param partitionerFactory How we construct a Partitioner.
+ */
+case class PartitionerDescription(partitionerFactory: PartitionerFactory)
+
+object Partitioner {
+  val UNKNOWN_PARTITION_ID = -1
+
+  def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
+    PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
new file mode 100644
index 0000000..55ef614
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import scala.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * The idea of ShuffleGroupingPartitioner is derived from Storm.
+ * Messages are randomly distributed across the downstream's tasks in a way such that
+ * each task is guaranteed to get an equal number of messages.
+ */
+class ShuffleGroupingPartitioner extends UnicastPartitioner {
+  private val random = new Random
+  private var index = -1
+  private var partitions = List.empty[Int]
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    index += 1
+    if (partitions.isEmpty) {
+      partitions = 0.until(partitionNum).toList
+      partitions = random.shuffle(partitions)
+    } else if (index >= partitionNum) {
+      index = 0
+      partitions = random.shuffle(partitions)
+    }
+    partitions(index)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
new file mode 100644
index 0000000..5c66d66
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * Round Robin partition the data to downstream processor tasks.
+ */
+class ShufflePartitioner extends UnicastPartitioner {
+  private var seed = 0
+  private var count = 0
+
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+
+    if (seed == 0) {
+      seed = newSeed()
+    }
+
+    val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
+    count = count + 1
+    result
+  }
+
+  private def newSeed(): Int = new Random().nextInt()
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala b/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala
new file mode 100644
index 0000000..497506b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/security/Authenticator.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.gearpump.security.Authenticator.AuthenticationResult
+
+/**
+ * Authenticator for UI dashboard.
+ *
+ * Sub Class must implement a constructor with signature like this:
+ * this(config: Config)
+ */
+trait Authenticator {
+
+  // TODO: Change the signature to return more attributes of user credentials...
+  def authenticate(
+      user: String, password: String, ec: ExecutionContext): Future[AuthenticationResult]
+}
+
+object Authenticator {
+
+  trait AuthenticationResult {
+
+    def authenticated: Boolean
+
+    def permissionLevel: Int
+  }
+
+  val UnAuthenticated = new AuthenticationResult {
+    override val authenticated = false
+    override val permissionLevel = -1
+  }
+
+  /** Guest can view but have no permission to submit app or write */
+  val Guest = new AuthenticationResult {
+    override val authenticated = true
+    override val permissionLevel = 1000
+  }
+
+  /** User can submit app, kill app, but have no permission to add or remote machines */
+  val User = new AuthenticationResult {
+    override val authenticated = true
+    override val permissionLevel = 1000 + Guest.permissionLevel
+  }
+
+  /** Super user */
+  val Admin = new AuthenticationResult {
+    override val authenticated = true
+    override val permissionLevel = 1000 + User.permissionLevel
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala b/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala
new file mode 100644
index 0000000..110fd8c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticator.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.security.Authenticator.AuthenticationResult
+import org.apache.gearpump.security.ConfigFileBasedAuthenticator._
+
+object ConfigFileBasedAuthenticator {
+
+  private val ROOT = "gearpump.ui-security.config-file-based-authenticator"
+  private val ADMINS = ROOT + "." + "admins"
+  private val USERS = ROOT + "." + "users"
+  private val GUESTS = ROOT + "." + "guests"
+
+  private case class Credentials(
+      admins: Map[String, String], users: Map[String, String], guests: Map[String, String]) {
+
+    def verify(user: String, password: String): AuthenticationResult = {
+      if (admins.contains(user)) {
+        if (verify(user, password, admins)) {
+          Authenticator.Admin
+        } else {
+          Authenticator.UnAuthenticated
+        }
+      } else if (users.contains(user)) {
+        if (verify(user, password, users)) {
+          Authenticator.User
+        } else {
+          Authenticator.UnAuthenticated
+        }
+      } else if (guests.contains(user)) {
+        if (verify(user, password, guests)) {
+          Authenticator.Guest
+        } else {
+          Authenticator.UnAuthenticated
+        }
+      } else {
+        Authenticator.UnAuthenticated
+      }
+    }
+
+    private def verify(user: String, password: String, map: Map[String, String]): Boolean = {
+      val storedPass = map(user)
+      PasswordUtil.verify(password, storedPass)
+    }
+  }
+}
+
+/**
+ * UI dashboard authenticator based on configuration file.
+ *
+ * It has three categories of users: admins, users, and guests.
+ * admins have unlimited permission, like shutdown a cluster, add/remove machines.
+ * users have limited permission to submit an application and etc..
+ * guests can not submit/kill applications, but can view the application status.
+ *
+ * see conf/gear.conf section gearpump.ui-security.config-file-based-authenticator to find
+ * information about how to configure this authenticator.
+ *
+ * [Security consideration]
+ * It will keep one-way sha1 digest of password instead of password itself. The original password is
+ * NOT kept in any way, so generally it is safe.
+ *
+ *
+ * digesting flow (from original password to digest):
+ * {{{
+ * random salt byte array of length 8 -> byte array of (salt + sha1(salt, password)) ->
+ * base64Encode.
+ * }}}
+ *
+ * Verification user input password with stored digest:
+ * {{{
+ * base64Decode -> extract salt -> do sha1(salt, password) -> generate digest:
+ * salt + sha1 -> compare the generated digest with the stored digest.
+ * }}}
+ */
+class ConfigFileBasedAuthenticator(config: Config) extends Authenticator {
+
+  private val credentials = loadCredentials(config)
+
+  override def authenticate(user: String, password: String, ec: ExecutionContext)
+    : Future[AuthenticationResult] = {
+    implicit val ctx = ec
+    Future {
+      credentials.verify(user, password)
+    }
+  }
+
+  private def loadCredentials(config: Config): Credentials = {
+    val admins = configToMap(config, ADMINS)
+    val users = configToMap(config, USERS)
+    val guests = configToMap(config, GUESTS)
+    new Credentials(admins, users, guests)
+  }
+
+  private def configToMap(config: Config, path: String) = {
+    import scala.collection.JavaConverters._
+    config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala b/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala
new file mode 100644
index 0000000..25b68c6
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/security/PasswordUtil.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+
+import java.security.MessageDigest
+import scala.util.Try
+
+import sun.misc.{BASE64Decoder, BASE64Encoder}
+
+/**
+ * Util to verify whether user input password is valid or not.
+ * It use sha1 to do the digesting.
+ */
+object PasswordUtil {
+  private val SALT_LENGTH = 8
+
+  /**
+   * Verifies user input password with stored digest:
+   * {{{
+   * base64Decode -> extract salt -> do sha1(salt, password) ->
+   * generate digest: salt + sha1 -> compare the generated digest with the stored digest.
+   * }}}
+   */
+  def verify(password: String, stored: String): Boolean = {
+    Try {
+      val decoded = base64Decode(stored)
+      val salt = new Array[Byte](SALT_LENGTH)
+      Array.copy(decoded, 0, salt, 0, SALT_LENGTH)
+
+      hash(password, salt) == stored
+    }.getOrElse(false)
+  }
+  /**
+   * digesting flow (from original password to digest):
+   * {{{
+   * random salt byte array of length 8 ->
+   * byte array of (salt + sha1(salt, password)) -> base64Encode
+   * }}}
+   */
+  def hash(password: String): String = {
+    // Salt generation 64 bits long
+    val salt = new Array[Byte](SALT_LENGTH)
+    new java.util.Random().nextBytes(salt)
+    hash(password, salt)
+  }
+
+  private def hash(password: String, salt: Array[Byte]): String = {
+    val digest = MessageDigest.getInstance("SHA-1")
+    digest.reset()
+    digest.update(salt)
+    var input = digest.digest(password.getBytes("UTF-8"))
+    digest.reset()
+    input = digest.digest(input)
+    val withSalt = salt ++ input
+    base64Encode(withSalt)
+  }
+
+  private def base64Encode(data: Array[Byte]): String = {
+    val endecoder = new BASE64Encoder()
+    endecoder.encode(data)
+  }
+
+  private def base64Decode(data: String): Array[Byte] = {
+    val decoder = new BASE64Decoder()
+    decoder.decodeBuffer(data)
+  }
+
+  // scalastyle:off println
+  private def help() = {
+    Console.println("usage: gear org.apache.gearpump.security.PasswordUtil -password " +
+      "<your password>")
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (args.length != 2 || args(0) != "-password") {
+      help()
+    } else {
+      val pass = args(1)
+      val result = hash(pass)
+      Console.println("Here is the hashed password")
+      Console.println("==============================")
+      Console.println(result)
+    }
+  }
+  // scalastyle:on println
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala
new file mode 100644
index 0000000..6295faf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializationFramework.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import akka.actor.ExtendedActorSystem
+
+import org.apache.gearpump.cluster.UserConfig
+
+/**
+ * A build-in serializer framework using kryo
+ *
+ * NOTE: The Kryo here is a shaded version by Gearpump
+ */
+class FastKryoSerializationFramework extends SerializationFramework {
+  private var system: ExtendedActorSystem = null
+
+  private lazy val pool = new ThreadLocal[Serializer]() {
+    override def initialValue(): Serializer = {
+      new FastKryoSerializer(system)
+    }
+  }
+
+  override def init(system: ExtendedActorSystem, config: UserConfig): Unit = {
+    this.system = system
+  }
+
+  override def get(): Serializer = {
+    pool.get()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
new file mode 100644
index 0000000..ed1e347
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import akka.actor.ExtendedActorSystem
+
+import io.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
+import io.gearpump.objenesis.strategy.StdInstantiatorStrategy
+import io.gearpump.romix.serialization.kryo.KryoSerializerWrapper
+import org.apache.gearpump.serializer.FastKryoSerializer.KryoSerializationException
+import org.apache.gearpump.util.LogUtil
+
+class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
+
+  private val LOG = LogUtil.getLogger(getClass)
+  private val config = system.settings.config
+
+  private val kryoSerializer = new KryoSerializerWrapper(system)
+  private val kryo = kryoSerializer.kryo
+  val strategy = new DefaultInstantiatorStrategy
+  strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy)
+  kryo.setInstantiatorStrategy(strategy)
+  private val kryoClazz = new GearpumpSerialization(config).customize(kryo)
+
+  override def serialize(message: Any): Array[Byte] = {
+    try {
+      kryoSerializer.toBinary(message)
+    } catch {
+      case ex: java.lang.IllegalArgumentException =>
+        val clazz = message.getClass
+        val error = s"""
+          | ${ex.getMessage}
+          |You can also register the class by providing a configuration with serializer
+          |defined,
+          |
+          |gearpump{
+          |  serializers {
+          |    ## Follow this format when adding new serializer for new message types
+          |    #    "yourpackage.YourClass" = "yourpackage.YourSerializerForThisClass"
+          |
+          |    ## If you intend to use default serializer for this class, then you can write this
+          |    #    "yourpackage.YourClass" = ""
+          |  }
+          |}
+          |
+          |If you want to register the serializer globally, you need to change
+          |gear.conf on every worker in the cluster; if you only want to register
+          |the serializer for a single streaming application, you need to create
+          |a file under conf/ named application.conf, and add the above configuration
+          |into application.conf. To verify whether the configuration is effective,
+          |you can browser your UI http://{UI Server Host}:8090/api/v1.0/app/{appId}/config,
+          |and check whether your custom serializer is added.
+        """.stripMargin
+
+        LOG.error(error, ex)
+        throw new KryoSerializationException(error, ex)
+    }
+  }
+
+  override def deserialize(msg: Array[Byte]): Any = {
+    kryoSerializer.fromBinary(msg)
+  }
+}
+
+object FastKryoSerializer {
+  class KryoSerializationException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
new file mode 100644
index 0000000..f9c6299
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+class GearpumpSerialization(config: Config) {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  def customize(kryo: Kryo): Unit = {
+
+    val serializationMap = configToMap(config, Constants.GEARPUMP_SERIALIZERS)
+
+    serializationMap.foreach { kv =>
+      val (key, value) = kv
+      val keyClass = Class.forName(key)
+
+      if (value == null || value.isEmpty) {
+
+        // Use default serializer for this class type
+        kryo.register(keyClass)
+      } else {
+        val valueClass = Class.forName(value)
+        val register = kryo.register(keyClass,
+          valueClass.newInstance().asInstanceOf[KryoSerializer[_]])
+        LOG.debug(s"Registering ${keyClass}, id: ${register.getId}")
+      }
+    }
+    kryo.setReferences(false)
+
+    // Requires the user to register the class first before using
+    kryo.setRegistrationRequired(true)
+  }
+
+  private final def configToMap(config: Config, path: String) = {
+    import scala.collection.JavaConverters._
+    config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala b/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala
new file mode 100644
index 0000000..995ba1f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/SerializationFramework.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import akka.actor.ExtendedActorSystem
+
+import org.apache.gearpump.cluster.UserConfig
+
+/**
+ * User are allowed to use a customized serialization framework by extending this
+ * interface.
+ */
+trait SerializationFramework {
+  def init(system: ExtendedActorSystem, config: UserConfig)
+
+  /**
+   * Need to be thread safe
+   *
+   * Get a serializer to use.
+   * Note: this method can be called in a multi-thread environment. It's the
+   * responsibility of SerializationFramework Developer to assure this method
+   * is thread safe.
+   *
+   * To be thread-safe, one recommendation would be using a thread local pool
+   * to maintain reference to Serializer of same thread.
+   */
+  def get(): Serializer
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala
new file mode 100644
index 0000000..7c0f4bf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/serializer/Serializer.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+/**
+ * User defined message serializer
+ */
+trait Serializer {
+  def serialize(message: Any): Array[Byte]
+
+  def deserialize(msg: Array[Byte]): Any
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/Express.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/Express.scala b/core/src/main/scala/org/apache/gearpump/transport/Express.scala
new file mode 100644
index 0000000..ef1e8d9
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/Express.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport
+
+import scala.collection.immutable.LongMap
+import scala.concurrent._
+
+import akka.actor._
+import akka.agent.Agent
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.netty.Client.Close
+import org.apache.gearpump.transport.netty.{TaskMessage, Context}
+import org.apache.gearpump.util.LogUtil
+
+trait ActorLookupById {
+
+  /** Lookup actor ref for local task actor by providing a TaskId (TaskId.toLong) */
+  def lookupLocalActor(id: Long): Option[ActorRef]
+}
+
+/**
+ * Custom networking layer.
+ *
+ * It will translate long sender/receiver address to shorter ones to reduce
+ * the network overhead.
+ */
+class Express(val system: ExtendedActorSystem) extends Extension with ActorLookupById {
+
+  import system.dispatcher
+
+  import org.apache.gearpump.transport.Express._
+  val localActorMap = Agent(LongMap.empty[ActorRef])
+  val remoteAddressMap = Agent(Map.empty[Long, HostPort])
+
+  val remoteClientMap = Agent(Map.empty[HostPort, ActorRef])
+
+  val conf = system.settings.config
+
+  lazy val (context, serverPort, localHost) = init
+
+  lazy val init = {
+    LOG.info(s"Start Express init ...${system.name}")
+    val context = new Context(system, conf)
+    val serverPort = context.bind("netty-server", this)
+    val localHost = HostPort(system.provider.getDefaultAddress.host.get, serverPort)
+    LOG.info(s"binding to netty server $localHost")
+
+    system.registerOnTermination(new Runnable {
+      override def run(): Unit = context.close()
+    })
+    (context, serverPort, localHost)
+  }
+
+  def unregisterLocalActor(id: Long): Unit = {
+    localActorMap.sendOff(_ - id)
+  }
+
+  /** Start Netty client actors to connect to remote machines */
+  def startClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
+    val clientsToClose = remoteClientMap.get().filterKeys(!hostPorts.contains(_)).keySet
+    closeClients(clientsToClose)
+    hostPorts.toList.foldLeft(Future(Map.empty[HostPort, ActorRef])) { (future, hostPort) =>
+      remoteClientMap.alter { map =>
+        if (!map.contains(hostPort)) {
+          val actor = context.connect(hostPort)
+          map + (hostPort -> actor)
+        } else {
+          map
+        }
+      }
+    }
+  }
+
+  def closeClients(hostPorts: Set[HostPort]): Future[Map[HostPort, ActorRef]] = {
+    remoteClientMap.alter { map =>
+      map.filterKeys(hostPorts.contains).foreach { hostAndClient =>
+        val (_, client) = hostAndClient
+        client ! Close
+      }
+      map -- hostPorts
+    }
+  }
+
+  def registerLocalActor(id: Long, actor: ActorRef): Unit = {
+    LOG.info(s"RegisterLocalActor: $id, actor: ${actor.path.name}")
+    init
+    localActorMap.sendOff(_ + (id -> actor))
+  }
+
+  def lookupLocalActor(id: Long): Option[ActorRef] = localActorMap.get().get(id)
+
+  def lookupRemoteAddress(id: Long): Option[HostPort] = remoteAddressMap.get().get(id)
+
+  /** Send message to remote task */
+  def transport(taskMessage: TaskMessage, remote: HostPort): Unit = {
+
+    val remoteClient = remoteClientMap.get.get(remote)
+    if (remoteClient.isDefined) {
+      remoteClient.get.tell(taskMessage, Actor.noSender)
+    } else {
+      val errorMsg = s"Clients has not been launched properly before transporting messages, " +
+        s"the destination is $remote"
+      LOG.error(errorMsg)
+      throw new Exception(errorMsg)
+    }
+  }
+}
+
+/** A customized transport layer by using Akka extension */
+object Express extends ExtensionId[Express] with ExtensionIdProvider {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def get(system: ActorSystem): Express = super.get(system)
+
+  override def lookup: ExtensionId[Express] = Express
+
+  override def createExtension(system: ExtendedActorSystem): Express = new Express(system)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala b/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala
new file mode 100644
index 0000000..d6dcd08
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/HostPort.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport
+
+case class HostPort(host: String, port: Int) {
+  def toTuple: (String, Int) = {
+    (host, port)
+  }
+}
+
+object HostPort {
+  def apply(address: String): HostPort = {
+    val hostAndPort = address.split(":")
+    new HostPort(hostAndPort(0), hostAndPort(1).toInt)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala
new file mode 100644
index 0000000..506e11c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Client.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.net.{ConnectException, InetSocketAddress}
+import java.nio.channels.ClosedChannelException
+import java.util
+import java.util.Random
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
+
+import akka.actor.Actor
+import org.jboss.netty.bootstrap.ClientBootstrap
+import org.jboss.netty.channel._
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Netty Client implemented as an actor, on the other side, there is a netty server Actor.
+ * All messages sent to this actor will be forwarded to remote machine.
+ */
+class Client(conf: NettyConfig, factory: ChannelFactory, hostPort: HostPort) extends Actor {
+  import org.apache.gearpump.transport.netty.Client._
+
+  val name = s"netty-client-$hostPort"
+
+  private final var bootstrap: ClientBootstrap = null
+  private final val random: Random = new Random
+  private val serializer = conf.newTransportSerializer
+  private var channel: Channel = null
+
+  var batch = new util.ArrayList[TaskMessage]
+
+  private val init = {
+    bootstrap = NettyUtil.createClientBootStrap(factory,
+      new ClientPipelineFactory(name, conf), conf.buffer_size)
+    self ! Connect(0)
+  }
+
+  def receive: Receive = messageHandler orElse connectionHandler
+
+  def messageHandler: Receive = {
+    case msg: TaskMessage =>
+      batch.add(msg)
+    case flush@Flush(flushChannel) =>
+      if (channel != flushChannel) {
+        Unit // Drop, as it belong to old channel flush message
+      } else if (batch.size > 0 && flushChannel.isWritable) {
+        send(flushChannel, batch.iterator)
+        batch.clear()
+        self ! flush
+      } else {
+        import context.dispatcher
+        context.system.scheduler.scheduleOnce(
+          new FiniteDuration(conf.flushCheckInterval, TimeUnit.MILLISECONDS), self, flush)
+      }
+  }
+
+  def connectionHandler: Receive = {
+    case ChannelReady(channel) =>
+      this.channel = channel
+      self ! Flush(channel)
+    case Connect(tries) =>
+      if (null == channel) {
+        connect(tries)
+      } else {
+        LOG.error("there already exist a channel, will not establish a new one...")
+      }
+    case CompareAndReconnectIfEqual(oldChannel) =>
+      if (oldChannel == channel) {
+        channel = null
+        self ! Connect(0)
+      }
+    case Close =>
+      close()
+      context.become(closed)
+  }
+
+  def closed: Receive = {
+    case msg: AnyRef =>
+      LOG.error(s"This client $name is closed, drop any message ${msg.getClass.getSimpleName}...")
+  }
+
+  private def connect(tries: Int): Unit = {
+    LOG.info(s"netty client try to connect to $name, tries: $tries")
+    if (tries <= conf.max_retries) {
+      val remote_addr = new InetSocketAddress(hostPort.host, hostPort.port)
+      val future = bootstrap.connect(remote_addr)
+      future success { current =>
+        LOG.info(s"netty client successfully connectted to $name, tries: $tries")
+        self ! ChannelReady(current)
+      } fail { (current, ex) =>
+        LOG.error(s"failed to connect to $name, reason: ${ex.getMessage}, class: ${ex.getClass}")
+        current.close()
+        import context.dispatcher
+        context.system.scheduler.scheduleOnce(
+          new FiniteDuration(
+            getSleepTimeMs(tries), TimeUnit.MILLISECONDS), self, Connect(tries + 1))
+      }
+    } else {
+      LOG.error(s"fail to connect to a remote host $name after retied $tries ...")
+      self ! Close
+    }
+  }
+
+  private def send(flushChannel: Channel, msgs: util.Iterator[TaskMessage]) {
+    var messageBatch: MessageBatch = null
+
+    while (msgs.hasNext) {
+      val message: TaskMessage = msgs.next()
+      if (null == messageBatch) {
+        messageBatch = new MessageBatch(conf.messageBatchSize, serializer)
+      }
+      messageBatch.add(message)
+      if (messageBatch.isFull) {
+        val toBeFlushed: MessageBatch = messageBatch
+        flushRequest(flushChannel, toBeFlushed)
+        messageBatch = null
+      }
+    }
+    if (null != messageBatch && !messageBatch.isEmpty) {
+      flushRequest(flushChannel, messageBatch)
+    }
+  }
+
+  private def close() {
+    LOG.info(s"closing netty client $name...")
+    if (null != channel) {
+      channel.close()
+      channel = null
+    }
+    batch = null
+  }
+
+  override def postStop(): Unit = {
+    close()
+  }
+
+  private def flushRequest(channel: Channel, requests: MessageBatch) {
+    val future: ChannelFuture = channel.write(requests)
+    future.fail { (channel, ex) =>
+      if (channel.isOpen) {
+        channel.close
+      }
+      LOG.error(s"failed to send requests " +
+        s"to ${channel.getRemoteAddress} ${ex.getClass.getSimpleName}")
+      if (!ex.isInstanceOf[ClosedChannelException]) {
+        LOG.error(ex.getMessage, ex)
+      }
+      self ! CompareAndReconnectIfEqual(channel)
+    }
+  }
+
+  private def getSleepTimeMs(retries: Int): Long = {
+    if (retries > 30) {
+      conf.max_sleep_ms
+    } else {
+      val backoff = 1 << retries
+      val sleepMs = conf.base_sleep_ms * Math.max(1, random.nextInt(backoff))
+      if (sleepMs < conf.max_sleep_ms) sleepMs else conf.max_sleep_ms
+    }
+  }
+
+  private def isChannelWritable = (null != channel) && channel.isWritable
+}
+
+object Client {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+
+  // Reconnect if current channel equals channel
+  case class CompareAndReconnectIfEqual(channel: Channel)
+
+  case class Connect(tries: Int)
+  case class ChannelReady(chanel: Channel)
+  case object Close
+
+  case class Flush(channel: Channel)
+
+  class ClientErrorHandler(name: String) extends SimpleChannelUpstreamHandler {
+
+    override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) {
+      event.getCause match {
+        case ex: ConnectException => Unit
+        case ex: ClosedChannelException =>
+          LOG.warn("exception found when trying to close netty connection", ex.getMessage)
+        case ex => LOG.error("Connection failed " + name, ex)
+      }
+    }
+  }
+
+  class ClientPipelineFactory(name: String, conf: NettyConfig) extends ChannelPipelineFactory {
+    def getPipeline: ChannelPipeline = {
+      val pipeline: ChannelPipeline = Channels.pipeline
+      pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
+      pipeline.addLast("encoder", new MessageEncoder)
+      pipeline.addLast("handler", new ClientErrorHandler(name))
+      pipeline
+    }
+  }
+
+  implicit def channelFutureToChannelFutureOps(channel: ChannelFuture): ChannelFutureOps = {
+    new ChannelFutureOps(channel)
+  }
+
+  class ChannelFutureOps(channelFuture: ChannelFuture) {
+
+    def success(handler: (Channel => Unit)): ChannelFuture = {
+      channelFuture.addListener(new ChannelFutureListener {
+        def operationComplete(future: ChannelFuture) {
+          if (future.isSuccess) {
+            handler(future.getChannel)
+          }
+        }
+      })
+      channelFuture
+    }
+
+    def fail(handler: ((Channel, Throwable) => Unit)): ChannelFuture = {
+      channelFuture.addListener(new ChannelFutureListener {
+        def operationComplete(future: ChannelFuture) {
+          if (!future.isSuccess) {
+            handler(future.getChannel, future.getCause)
+          }
+        }
+      })
+      channelFuture
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala
new file mode 100644
index 0000000..bc19960
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Context.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.io.Closeable
+import java.util.concurrent._
+
+import scala.collection.JavaConverters._
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.Config
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.netty.Server.ServerPipelineFactory
+import org.apache.gearpump.transport.{ActorLookupById, HostPort}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+object Context {
+  private final val LOG: Logger = LogUtil.getLogger(getClass)
+}
+
+/** Netty Context */
+class Context(system: ActorSystem, conf: NettyConfig) extends IContext {
+  import org.apache.gearpump.transport.netty.Context._
+
+  def this(system: ActorSystem, conf: Config) {
+    this(system, new NettyConfig(conf))
+  }
+
+  private val closeHandler = new ConcurrentLinkedQueue[Closeable]()
+  private val nettyDispatcher = system.settings.config.getString(Constants.NETTY_DISPATCHER)
+  val maxWorkers: Int = 1
+
+  private lazy val clientChannelFactory: NioClientSocketChannelFactory = {
+    val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-boss")
+    val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + "-worker")
+    val channelFactory =
+      new NioClientSocketChannelFactory(
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory), maxWorkers)
+
+    closeHandler.add(new Closeable {
+      override def close(): Unit = {
+        LOG.info("Closing all client resources....")
+        channelFactory.releaseExternalResources
+      }
+    })
+    channelFactory
+  }
+
+  def bind(
+      name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = true,
+      inputPort: Int = 0): Int = {
+    // TODO: whether we should expose it as application config?
+    val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor,
+      deserializeFlag).withDispatcher(nettyDispatcher), name)
+    val (port, channel) = NettyUtil.newNettyServer(name,
+      new ServerPipelineFactory(server, conf), 5242880, inputPort)
+    val factory = channel.getFactory
+    closeHandler.add(new Closeable {
+      override def close(): Unit = {
+        system.stop(server)
+        channel.close()
+        LOG.info("Closing all server resources....")
+        factory.releaseExternalResources
+      }
+    })
+    port
+  }
+
+  def connect(hostPort: HostPort): ActorRef = {
+    val client = system.actorOf(Props(classOf[Client], conf, clientChannelFactory, hostPort)
+      .withDispatcher(nettyDispatcher))
+    closeHandler.add(new Closeable {
+      override def close(): Unit = {
+        LOG.info("closing Client actor....")
+        system.stop(client)
+      }
+    })
+
+    client
+  }
+
+  /**
+   * terminate this context
+   */
+  def close(): Unit = {
+
+    LOG.info(s"Context.term, cleanup resources...., " +
+      s"we have ${closeHandler.size()} items to close...")
+
+    // Cleans up resource in reverse order so that client actor can be cleaned
+    // before clientChannelFactory
+    closeHandler.iterator().asScala.toList.reverse.foreach(_.close())
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala
new file mode 100644
index 0000000..dae7b3a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/IContext.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.transport.{ActorLookupById, HostPort}
+
+trait IContext {
+
+  /**
+   * Create a Netty server connection.
+   */
+  def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: Boolean, port: Int): Int
+
+  /**
+   * Create a Netty client actor
+   */
+  def connect(hostPort: HostPort): ActorRef
+
+  /**
+   * Close resource for this context
+   */
+  def close()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala
new file mode 100644
index 0000000..03f759e
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyConfig.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.util.Constants
+
+class NettyConfig(conf: Config) {
+
+  val buffer_size = conf.getInt(Constants.NETTY_BUFFER_SIZE)
+  val max_retries = conf.getInt(Constants.NETTY_MAX_RETRIES)
+  val base_sleep_ms = conf.getInt(Constants.NETTY_BASE_SLEEP_MS)
+  val max_sleep_ms = conf.getInt(Constants.NETTY_MAX_SLEEP_MS)
+  val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE)
+  val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL)
+
+  def newTransportSerializer: ITransportMessageSerializer = {
+    Class.forName(
+      conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER))
+      .newInstance().asInstanceOf[ITransportMessageSerializer]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala
new file mode 100644
index 0000000..ddb0afe
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/NettyUtil.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.net.InetSocketAddress
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.channel.{Channel, ChannelFactory, ChannelPipelineFactory}
+
+object NettyUtil {
+
+  def newNettyServer(
+      name: String,
+      pipelineFactory: ChannelPipelineFactory,
+      buffer_size: Int,
+      inputPort: Int = 0): (Int, Channel) = {
+    val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-boss")
+    val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + "-worker")
+    val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+      Executors.newCachedThreadPool(workerFactory), 1)
+
+    val bootstrap = createServerBootStrap(factory, pipelineFactory, buffer_size)
+    val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort))
+    val port = channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort()
+    (port, channel)
+  }
+
+  def createServerBootStrap(
+      factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
+    : ServerBootstrap = {
+    val bootstrap = new ServerBootstrap(factory)
+    bootstrap.setOption("child.tcpNoDelay", true)
+    bootstrap.setOption("child.receiveBufferSize", buffer_size)
+    bootstrap.setOption("child.keepAlive", true)
+    bootstrap.setPipelineFactory(pipelineFactory)
+    bootstrap
+  }
+
+  def createClientBootStrap(
+      factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, buffer_size: Int)
+    : ClientBootstrap = {
+    val bootstrap = new ClientBootstrap(factory)
+    bootstrap.setOption("tcpNoDelay", true)
+    bootstrap.setOption("sendBufferSize", buffer_size)
+    bootstrap.setOption("keepAlive", true)
+    bootstrap.setPipelineFactory(pipelineFactory)
+    bootstrap
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala b/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala
new file mode 100644
index 0000000..8d39795
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/transport/netty/Server.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport.netty
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable.IntMap
+import scala.concurrent.Future
+
+import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
+import org.jboss.netty.channel._
+import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.ActorLookupById
+import org.apache.gearpump.util.{AkkaHelper, LogUtil}
+
+/** Netty server actor, message received will be forward to the target on the address line. */
+class Server(
+    name: String, conf: NettyConfig, lookupActor: ActorLookupById, deserializeFlag: Boolean)
+  extends Actor {
+
+  private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = name)
+  import org.apache.gearpump.transport.netty.Server._
+
+  val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server")
+
+  val system = context.system.asInstanceOf[ExtendedActorSystem]
+
+  def receive: Receive = msgHandler orElse channelManager
+  // As we will only transfer TaskId on the wire,
+  // this object will translate taskId to or from ActorRef
+  private val taskIdActorRefTranslation = new TaskIdActorRefTranslation(context)
+
+  def channelManager: Receive = {
+    case AddChannel(channel) => allChannels.add(channel)
+    case CloseChannel(channel) =>
+      import context.dispatcher
+      Future {
+        channel.close.awaitUninterruptibly
+        allChannels.remove(channel)
+      }
+  }
+
+  def msgHandler: Receive = {
+    case MsgBatch(msgs) =>
+      msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch =>
+        val (taskId, taskMessages) = taskBatch
+        val actor = lookupActor.lookupLocalActor(taskId)
+
+        if (actor.isEmpty) {
+          LOG.error(s"Cannot find actor for id: $taskId...")
+        } else taskMessages.foreach { taskMessage =>
+          actor.get.tell(taskMessage.message(),
+            taskIdActorRefTranslation.translateToActorRef(taskMessage.sessionId()))
+        }
+      }
+  }
+
+  override def postStop(): Unit = {
+    allChannels.close.awaitUninterruptibly
+  }
+}
+
+object Server {
+
+  class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends ChannelPipelineFactory {
+    def getPipeline: ChannelPipeline = {
+      val pipeline: ChannelPipeline = Channels.pipeline
+      pipeline.addLast("decoder", new MessageDecoder(conf.newTransportSerializer))
+      pipeline.addLast("encoder", new MessageEncoder)
+      pipeline.addLast("handler", new ServerHandler(server))
+      pipeline
+    }
+  }
+
+  class ServerHandler(server: ActorRef) extends SimpleChannelUpstreamHandler {
+    private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = server.path.name)
+
+    override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
+      server ! AddChannel(e.getChannel)
+    }
+
+    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+      val msgs: util.List[TaskMessage] = e.getMessage.asInstanceOf[util.List[TaskMessage]]
+      if (msgs != null) {
+        server ! MsgBatch(msgs)
+      }
+    }
+
+    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+      LOG.error("server errors in handling the request", e.getCause)
+      server ! CloseChannel(e.getChannel)
+    }
+  }
+
+  class TaskIdActorRefTranslation(context: ActorContext) {
+    private var taskIdtoActorRef = IntMap.empty[ActorRef]
+
+    /** 1-1 mapping from session id to fake ActorRef */
+    def translateToActorRef(sessionId: Int): ActorRef = {
+      if (!taskIdtoActorRef.contains(sessionId)) {
+
+        // A fake ActorRef for performance optimization.
+        val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId")
+        taskIdtoActorRef += sessionId -> actorRef
+      }
+      taskIdtoActorRef.get(sessionId).get
+    }
+  }
+
+  case class AddChannel(channel: Channel)
+
+  case class CloseChannel(channel: Channel)
+
+  case class MsgBatch(messages: java.lang.Iterable[TaskMessage])
+
+}
\ No newline at end of file