You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:19 UTC

[09/49] incubator-gearpump git commit: fix #1988, upgrade akka to akka 2.4.2

fix #1988, upgrade akka to akka 2.4.2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/21d59216
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/21d59216
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/21d59216

Branch: refs/heads/master
Commit: 21d59216b181186e322738cef751e3bc7abc6a81
Parents: bcdbfc9
Author: Sean Zhong <cl...@gmail.com>
Authored: Fri Mar 25 00:33:23 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:23:48 2016 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 conf/gear.conf                                  |   2 +-
 .../main/java/io/gearpump/util/AkkaHelper.java  |  35 +++++++
 .../io/gearpump/util/HadoopFSLogAppender.java   |  18 ++++
 core/src/main/resources/geardefault.conf        |   9 ++
 .../io/gearpump/transport/netty/Server.scala    |   6 +-
 .../main/scala/io/gearpump/util/ActorUtil.scala |   1 +
 .../io/gearpump/cluster/DaemonMessage.scala     |   2 -
 .../scala/io/gearpump/cluster/main/Master.scala |  35 ++++---
 .../io/gearpump/cluster/master/AppManager.scala |   2 +-
 .../cluster/master/ClusterReplication.scala     |  64 -----------
 .../cluster/master/InMemoryKVService.scala      | 105 +++++++++++--------
 .../io/gearpump/cluster/master/Master.scala     |   6 +-
 .../scala/io/gearpump/util/FileDirective.scala  |  12 ++-
 .../scala/io/gearpump/util/FileServer.scala     |  15 ++-
 .../cluster/main/MasterWatcherSpec.scala        |   3 +-
 .../cluster/master/InMemoryKVServiceSpec.scala  |   7 +-
 .../yarn/appmaster/YarnAppMaster.scala          |   4 +-
 .../yarn/client/AppMasterResolver.scala         |   4 +-
 .../checklist/MessageDeliverySpec.scala         |   2 +-
 project/Build.scala                             |  65 +++++++-----
 project/Pack.scala                              |   4 +-
 project/plugins.sbt                             |   6 +-
 .../gearpump/services/MasterServiceSpec.scala   |   6 +-
 .../gearpump/services/SecurityServiceSpec.scala |  16 +--
 ...CloudFoundryUAAOAuth2AuthenticatorSpec.scala |  14 +--
 .../oauth2/GoogleOAuth2AuthenticatorSpec.scala  |  14 +--
 .../streaming/task/ExpressTransport.scala       |   4 +-
 .../streaming/appmaster/DagManagerSpec.scala    |   2 +-
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |   1 +
 .../2.10/akka-actor_2.10-2.3.12-fix-1816.jar    | Bin 2815968 -> 0 bytes
 .../2.11/akka-actor_2.11-2.3.12-fix-1816.jar    | Bin 2790862 -> 0 bytes
 32 files changed, 248 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 771f652..5b6b80f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@ script:
     then sbt -jvm-opts project/travis/jvmopts clean +assembly +packArchiveZip | grep -v -E "$skipLogs";
   fi  
 jdk:
-- oraclejdk7
+- oraclejdk8
 scala:
-- 2.11.5
+- 2.11.8
 cache:
   directories:
   - $HOME/.m2

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index 1ab4bb6..d8e8b4c 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -320,7 +320,7 @@ gearpump {
 ### Configuration only visible to master nodes..
 gearpump-master {
   extensions = [
-    "akka.contrib.datareplication.DataReplication$"
+    "akka.cluster.ddata.DistributedData$"
   ]
   akka {
     #########################################

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/java/io/gearpump/util/AkkaHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/io/gearpump/util/AkkaHelper.java b/core/src/main/java/io/gearpump/util/AkkaHelper.java
new file mode 100644
index 0000000..f4772c1
--- /dev/null
+++ b/core/src/main/java/io/gearpump/util/AkkaHelper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.util;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+public class AkkaHelper {
+
+  /**
+   * Helper util to access the private[akka] system.actorFor method
+   *
+   * This is used for performance optimization, we encode the session Id
+   * in the ActorRef path. Session Id is used to identity sender Task.
+   */
+  public static ActorRef actorFor(ActorSystem system, String path) {
+    return system.actorFor(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
index a736f9c..ab7ee5e 100644
--- a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
+++ b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.util;
 
 import org.apache.log4j.RollingFileAppender;

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf
index 17626a3..394a928 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -96,6 +96,7 @@ gearpump {
   ### If you want to use metrics, please change
   ###########################
 
+
   ### Flag to enable metrics
   metrics {
     enabled = false
@@ -572,6 +573,14 @@ akka {
   actor {
     provider = "akka.remote.RemoteActorRefProvider"
 
+    ## Doesn't warn on Java serializer usage
+    ##
+    ## Most of our streaming message are using custom serializer, with a few
+    ## exception on system control message. The volume of system control
+    ## message should be small. So, turn this flag off until further benchmark
+    ## shows a different result.
+    warn-about-java-serializer-usage = false
+
     ## TODO: in integration test, may need to enable this
     ##creation-timeout=100s
     default-mailbox {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/scala/io/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
index c592b31..dde0861 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
@@ -22,7 +22,7 @@ import java.util
 
 import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
 import io.gearpump.transport.ActorLookupById
-import io.gearpump.util.LogUtil
+import io.gearpump.util.{LogUtil, AkkaHelper}
 import org.jboss.netty.channel._
 import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
 import org.slf4j.Logger
@@ -116,7 +116,9 @@ object Server {
 
     def translateToActorRef(sessionId : Int): ActorRef = {
       if(!taskIdtoActorRef.contains(sessionId)){
-        val actorRef = context.system.actorFor(s"/session#$sessionId")
+
+        // A fake ActorRef for performance optimization.
+        val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId")
         taskIdtoActorRef += sessionId -> actorRef
       }
       taskIdtoActorRef.get(sessionId).get

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
index b63733d..8233b28 100644
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
@@ -87,6 +87,7 @@ object ActorUtil {
   def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig,
     sender: ActorRef)(implicit executor : scala.concurrent.ExecutionContext) = {
     implicit val timeout = Constants.FUTURE_TIMEOUT
+
     (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
       val resources = list.workers.map {
         workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
index 36d84c0..19ac620 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
@@ -20,8 +20,6 @@ package io.gearpump.cluster
 import akka.actor.ActorRef
 import io.gearpump.cluster.master.Master.MasterInfo
 import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Resource
 
 /**
  * Cluster Bootup Flow

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
index c89ab1e..8d4515a 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
@@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import akka.cluster.ClusterEvent._
+import akka.cluster.ddata.DistributedData
 import akka.cluster.{Cluster, Member, MemberStatus}
-import akka.contrib.datareplication.DataReplication
-import akka.contrib.pattern.{ClusterSingletonManager, ClusterSingletonProxy}
+import akka.cluster.singleton.{ClusterSingletonManagerSettings, ClusterSingletonProxySettings, ClusterSingletonManager, ClusterSingletonProxy}
 import com.typesafe.config.ConfigValueFactory
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.master.{Master => MasterActor}
@@ -85,26 +85,28 @@ object Master extends AkkaApp with ArgumentsParser {
     LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
     val system = ActorSystem(MASTER, masterConfig)
 
-    val replicator = DataReplication(system).replicator
+    val replicator = DistributedData(system).replicator
     LOG.info(s"Replicator path: ${replicator.path}")
 
-    //start master proxy
-    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
-      singletonPath = s"/user/${SINGLETON_MANAGER}/${MASTER_WATCHER}/${MASTER}",
-      role = Some(MASTER)),
-      name = MASTER)
-
     //start singleton manager
     val singletonManager = system.actorOf(ClusterSingletonManager.props(
-      singletonProps = Props(classOf[MasterWatcher], MASTER, masterProxy),
-      singletonName = MASTER_WATCHER,
+      singletonProps = Props(classOf[MasterWatcher], MASTER),
       terminationMessage = PoisonPill,
-      role = Some(MASTER)),
+      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER).withRole(MASTER)),
       name = SINGLETON_MANAGER)
 
+    //start master proxy
+    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
+      // Master will only be created when there is a majority of machines started.
+      settings = ClusterSingletonProxySettings(system).withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+      name = MASTER
+    )
+
     LOG.info(s"master proxy is started at ${masterProxy.path}")
 
-    val mainThread = Thread.currentThread();
+    val mainThread = Thread.currentThread()
     Runtime.getRuntime().addShutdownHook(new Thread() {
       override def run() : Unit = {
         if (!system.isTerminated) {
@@ -120,16 +122,16 @@ object Master extends AkkaApp with ArgumentsParser {
             case ex : Exception => //ignore
           }
           system.shutdown()
-          mainThread.join();
+          mainThread.join()
         }
       }
-    });
+    })
 
     system.awaitTermination()
   }
 }
 
-class MasterWatcher(role: String, masterProxy : ActorRef) extends Actor  with ActorLogging {
+class MasterWatcher(role: String) extends Actor  with ActorLogging {
   import context.dispatcher
 
   val cluster = Cluster(context.system)
@@ -192,7 +194,6 @@ class MasterWatcher(role: String, masterProxy : ActorRef) extends Actor  with Ac
 
   def waitForShutdown : Receive = {
     case MasterWatcher.Shutdown => {
-      context.system.stop(masterProxy)
       cluster.unsubscribe(self)
       cluster.leave(cluster.selfAddress)
       context.stop(self)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
index 06ab8bb..e6bd1db 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
@@ -241,7 +241,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
       (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
         case PutKVSuccess =>
           client ! AppDataSaved
-        case PutKVFailed(k, v, ex) =>
+        case PutKVFailed(k, ex) =>
           client ! SaveAppDataFailed
       }
     case GetAppData(appId, key) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala b/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala
deleted file mode 100644
index faeccb3..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import akka.cluster.Cluster
-import akka.contrib.datareplication.Replicator._
-import akka.contrib.datareplication.{DataReplication, GSet}
-import io.gearpump.util.Constants
-import io.gearpump.util.Constants._
-import io.gearpump.util._
-import org.slf4j.Logger
-
-import scala.concurrent.duration.Duration
-
-/**
- * ClusterReplication use [[DataReplication]] to store replicated state.
- */
-trait ClusterReplication extends Actor with Stash {
-
-  val LOG: Logger = LogUtil.getLogger(getClass)
-  val systemconfig = context.system.settings.config
-
-  implicit val executionContext = context.dispatcher
-  implicit val cluster = Cluster(context.system)
-
-  val TIMEOUT = Duration(5, TimeUnit.SECONDS)
-  val STATE = "masterstate"
-  val KVService = "kvService"
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-
-  val replicator = DataReplication(context.system).replicator
-
-  val masterClusterSize = Math.max(1, systemconfig.getStringList(GEARPUMP_CLUSTER_MASTERS).size())
-
-  //optimize write path, we can tolerate one master down for recovery.
-  val writeQuorum = Math.min(2, masterClusterSize / 2 + 1)
-  val readQuorum = masterClusterSize + 1 - writeQuorum
-
-  def stateChangeListener : Receive = {
-    case update: UpdateResponse =>
-      LOG.debug(s"we get update $update")
-    case Changed(STATE, data: GSet) =>
-      LOG.info("master state updated ")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
index e31ed89..fb66a0c 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
@@ -18,65 +18,78 @@
 
 package io.gearpump.cluster.master
 
-import akka.actor._
-import akka.contrib.datareplication.LWWMap
-import akka.contrib.datareplication.Replicator._
-import akka.pattern.ask
+import java.util.concurrent.TimeUnit
 
-import scala.concurrent.Future
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.{DistributedData, LWWMap, Key, LWWMapKey}
+import akka.cluster.ddata.Replicator._
+import io.gearpump.util.{LogUtil}
+import org.slf4j.Logger
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
 
 /**
  * A replicated simple in-memory KV service.
  */
-class InMemoryKVService extends Actor with Stash with ClusterReplication {
+class InMemoryKVService extends Actor with Stash {
   import InMemoryKVService._
 
-  def receive : Receive = kvService orElse stateChangeListener
+  private val KV_SERVICE = "gearpump_kvservice"
 
-  override def preStart(): Unit = {
-    replicator ! Subscribe(STATE, self)
-  }
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val replicator = DistributedData(context.system).replicator
+  private implicit val cluster = Cluster(context.system)
+
+  //optimize write path, we can tolerate one master down for recovery.
+  private val timeout = Duration(15, TimeUnit.SECONDS)
+  private val readMajority = ReadMajority(timeout)
+  private val writeMajority = WriteMajority(timeout)
 
-  override def postStop(): Unit = {
-    replicator ! Unsubscribe(STATE, self)
+  private def groupKey(group: String): LWWMapKey[Any] = {
+    LWWMapKey[Any](KV_SERVICE + "_" + group)
   }
+
+  def receive : Receive = kvService
+
   def kvService : Receive = {
+
     case GetKV(group: String, key : String) =>
-      val client = sender
-      (replicator ? new Get(KVService + group, ReadFrom(readQuorum), TIMEOUT, None)).asInstanceOf[Future[GetResponse]].map {
-        case GetSuccess(_, appData: LWWMap, _) =>
-          LOG.info(s"Successfully retrived group: $group")
-          client ! GetKVSuccess(key, appData.get(key).orNull)
-        case x: NotFound =>
-          LOG.info(s"We cannot find group $group")
-          client ! GetKVSuccess(key, null)
-        case x : GetFailure =>
-          LOG.error(s"Failed to get application $key data, the request key is $key")
-          client ! GetKVFailed(new Exception(x.getClass.getName))
-        case GetSuccess(_, x, _) =>
-          LOG.error(s"Got unexpected response when get key $key, the response is $x")
-          client ! GetKVFailed(new Exception(x.getClass.getName))
-      }
+      val request = Request(sender(), key)
+      replicator ! Get(groupKey(group), readMajority, Some(request))
+    case success@ GetSuccess(group: LWWMapKey[Any], Some(request: Request)) =>
+      val appData = success.get(group)
+      LOG.info(s"Successfully retrived group: ${group.id}")
+      request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
+    case NotFound(group: LWWMapKey[Any], Some(request: Request)) =>
+      LOG.info(s"We cannot find group $group")
+      request.client ! GetKVSuccess(request.key, null)
+    case GetFailure(group: LWWMapKey[Any], Some(request: Request)) =>
+      val error = s"Failed to get application data, the request key is ${request.key}"
+      LOG.error(error)
+      request.client ! GetKVFailed(new Exception(error))
 
     case PutKV(group: String, key: String, value: Any) =>
-      val client = sender
-
-      val update = Update(KVService + group, LWWMap(),
-        WriteTo(writeQuorum), TIMEOUT) {map =>
+      val request = Request(sender(), key)
+      val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) {map =>
         map + (key -> value)
       }
-
-      val putFuture = (replicator ? update).asInstanceOf[Future[UpdateResponse]]
-
-      putFuture.map {
-        case UpdateSuccess(key, _) =>
-          client ! PutKVSuccess
-        case fail: UpdateFailure =>
-          client ! PutKVFailed(key, value, new Exception(fail.getClass.getName))
-      }
-    case DeleteKVGroup(group: String) =>
-      val client = sender
-      replicator ? Update(KVService + group, LWWMap(), WriteTo(writeQuorum), TIMEOUT)( _ => LWWMap())
+      replicator ! update
+    case UpdateSuccess(group: LWWMapKey[Any], Some(request: Request)) =>
+        request.client ! PutKVSuccess
+    case ModifyFailure(group: LWWMapKey[Any], error, cause, Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new Exception(error, cause))
+    case UpdateTimeout(group: LWWMapKey[Any], Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new TimeoutException())
+
+    case delete@ DeleteKVGroup(group: String) =>
+      replicator ! Delete(groupKey(group), writeMajority)
+    case DeleteSuccess(group) =>
+      LOG.info(s"KV Group ${group.id} is deleted")
+    case ReplicationDeleteFailure(group) =>
+      LOG.error(s"Failed to delete KV Group ${group.id}...")
+    case DataDeleted(group) =>
+      LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
   }
 }
 
@@ -96,9 +109,13 @@ object InMemoryKVService {
 
   case class DeleteKVGroup(group: String)
 
+  case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
   trait PutKVResult
 
   case object PutKVSuccess extends PutKVResult
 
-  case class PutKVFailed(key: String, value: Any, ex: Throwable) extends PutKVResult
+  case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+  case class Request(client: ActorRef, key: String)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index a4d9b54..f22a300 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -156,8 +156,8 @@ private[cluster] class Master extends Actor with Stash {
   def kvServiceMsgHandler: Receive = {
     case PutKVSuccess =>
       //Skip
-    case PutKVFailed(key, value, exception) =>
-      LOG.error(s"Put value $value with key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception))
+    case PutKVFailed(key, exception) =>
+      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception))
   }
 
   def metricsService : Receive = {
@@ -286,7 +286,7 @@ private[cluster] class Master extends Actor with Stash {
 }
 
 object Master {
-  final val MASTER_GROUP = "-1"
+  final val MASTER_GROUP = "master_group"
 
   final val WORKER_ID = "next_worker_id"
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
index b4ff5b3..f4f82fb 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
@@ -25,7 +25,7 @@ import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import akka.stream.Materializer
-import akka.stream.io.{SynchronousFileSink, SynchronousFileSource}
+import akka.stream.scaladsl.FileIO
 
 import scala.concurrent.{ExecutionContext, Future}
 
@@ -75,6 +75,8 @@ object FileDirective {
             ctx => {
               filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
             }
+
+
           }
         }
       }
@@ -88,7 +90,7 @@ object FileDirective {
     val responseEntity = HttpEntity(
       MediaTypes.`application/octet-stream`,
       file.length,
-      SynchronousFileSource(file, CHUNK_SIZE))
+      FileIO.fromFile(file, CHUNK_SIZE))
     complete(responseEntity)
   }
 
@@ -100,10 +102,10 @@ object FileDirective {
 
             //reserve the suffix
             val targetPath = File.createTempFile(s"userfile_${p.name}_", s"${p.filename.getOrElse("")}", rootDirectory)
-            val written = p.entity.dataBytes.runWith(SynchronousFileSink(targetPath))
+            val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
             written.map(written =>
-              if (written > 0) {
-                Map(p.name -> FileInfo(p.filename.get, targetPath, written))
+              if (written.count > 0) {
+                Map(p.name -> FileInfo(p.filename.get, targetPath, written.count))
               } else {
                 Map.empty[Name, FileInfo]
               })

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
index f208465..361c01d 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
@@ -24,15 +24,14 @@ import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.Http.ServerBinding
 import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.{Query, Path}
 import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
-import akka.stream.io.{SynchronousFileSink, SynchronousFileSource}
 import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
-import akka.stream.scaladsl.{Sink, Source}
+import akka.stream.scaladsl.{Sink, Source, FileIO}
 import io.gearpump.jarstore.FilePath
 import io.gearpump.util.FileDirective._
 import io.gearpump.util.FileServer.Port
@@ -65,7 +64,7 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory
       pathEndOrSingleSlash {
         extractUri { uri =>
           val upload = uri.withPath(Uri.Path("/upload")).toString()
-          val entity = HttpEntity(MediaTypes.`text/html`,
+          val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
             s"""
             |
             |<h2>Please specify a file to upload:</h2>
@@ -123,7 +122,7 @@ object FileServer {
         HttpRequest(HttpMethods.POST, uri = target, entity = entity)
       }
 
-      val response = Source(request).via(httpClient).runWith(Sink.head)
+      val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
       response.flatMap{some =>
         Unmarshal(some).to[String]
       }.map{path =>
@@ -132,17 +131,17 @@ object FileServer {
     }
 
     def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
-      val downoad = server.withPath(Path("/download")).withQuery("file" -> remoteFile.path)
+      val downoad = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
       //download file to local
       val response = Source.single(HttpRequest(uri = downoad)).via(httpClient).runWith(Sink.head)
       val downloaded = response.flatMap { response =>
-        response.entity.dataBytes.runWith(SynchronousFileSink(saveAs))
+        response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
       }
       downloaded.map(written => Unit)
     }
 
     private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-      val entity =  HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000))
+      val entity =  HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromFile(file, chunkSize = 100000))
       val body = Source.single(
         Multipart.FormData.BodyPart(
           "uploadfile",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
index 863d70c..3927993 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -34,9 +34,8 @@ class MasterWatcherSpec extends FlatSpec with Matchers {
     val system = ActorSystem("ForMasterWatcher", config)
 
     val actorWatcher = TestProbe()(system)
-    val mockMaster = TestProbe()(system)
 
-    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher", mockMaster.ref))
+    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
     actorWatcher watch masterWatcher
     actorWatcher.expectTerminated(masterWatcher, 5 seconds)
     system.shutdown()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
index 76bfd54..8f60d34 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
@@ -23,6 +23,7 @@ import akka.testkit.TestProbe
 import io.gearpump.cluster.master.InMemoryKVService._
 import io.gearpump.cluster.{MasterHarness, TestUtil}
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import scala.concurrent.duration._
 
 class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
 
@@ -54,7 +55,11 @@ class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEa
 
     client.send(kvService, DeleteKVGroup(group))
 
+    // after DeleteGroup, it no longer accept Get and Put
     client.send(kvService, GetKV(group, "key"))
-    client.expectMsg(GetKVSuccess("key", null))
+    client.expectNoMsg(3 seconds)
+
+    client.send(kvService, PutKV(group, "key", 3))
+    client.expectNoMsg(3 seconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index 6cc0e33..f8982ce 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -31,7 +31,7 @@ import io.gearpump.experiments.yarn.Constants._
 import io.gearpump.experiments.yarn.glue.Records._
 import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
 import io.gearpump.transport.HostPort
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+import io.gearpump.util._
 import org.apache.commons.httpclient.HttpClient
 import org.apache.commons.httpclient.methods.GetMethod
 import org.slf4j.Logger
@@ -359,7 +359,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
     }
 
     if (status == 200) {
-      system.actorFor(get.getResponseBodyAsString)
+      AkkaHelper.actorFor(system, get.getResponseBodyAsString)
     } else {
       throw new IOException("Fail to resolve AppMaster address, please make sure " +
         s"${report.getOriginalTrackingUrl} is accessible...")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
index adf1716..9f30570 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -23,7 +23,7 @@ import java.io.IOException
 import akka.actor.{ActorRef, ActorSystem}
 import io.gearpump.experiments.yarn.glue.Records.ApplicationId
 import io.gearpump.experiments.yarn.glue.YarnClient
-import io.gearpump.util.LogUtil
+import io.gearpump.util.{AkkaHelper, LogUtil}
 import org.apache.commons.httpclient.HttpClient
 import org.apache.commons.httpclient.methods.GetMethod
 import org.slf4j.Logger
@@ -52,7 +52,7 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
     if (status == 200) {
       val response = get.getResponseBodyAsString
       LOG.info("Successfully resolved AppMaster address: " + response)
-      system.actorFor(response)
+      AkkaHelper.actorFor(system, response)
     } else {
       throw new IOException("Fail to resolve AppMaster address, please make sure " +
         s"${report.getOriginalTrackingUrl} is accessible...")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
index 3f029ef..a9fdee5 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -44,7 +44,7 @@ class MessageDeliverySpec extends TestSpecBase {
             val appId = restClient.getNextAvailableAppId()
 
             val stateJar = cluster.queryBuiltInExampleJars("state-").head
-            val success = restClient.submitApp(stateJar, args)
+            val success = restClient.submitApp(stateJar, executorNum = 1, args = args)
             success shouldBe true
 
             // verify #1

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 0d08718..761c3b9 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -19,7 +19,7 @@ object Build extends sbt.Build {
 
   val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code")
 
-  val akkaVersion = "2.3.12"
+  val akkaVersion = "2.4.2"
   val kryoVersion = "0.3.2"
   val clouderaVersion = "2.6.0-cdh5.4.2"
   val clouderaHBaseVersion = "1.0.0-cdh5.4.2"
@@ -37,7 +37,7 @@ object Build extends sbt.Build {
   val slf4jVersion = "1.7.7"
   val gsCollectionsVersion = "6.2.0"
 
-  val crossScalaVersionNumbers = Seq("2.10.5", "2.11.5")
+  val crossScalaVersionNumbers = Seq("2.11.8")
   val scalaVersionNumber = crossScalaVersionNumbers.last
   val sprayVersion = "1.3.2"
   val sprayJsonVersion = "1.3.1"
@@ -48,7 +48,6 @@ object Build extends sbt.Build {
   val scalazVersion = "7.1.1"
   val algebirdVersion = "0.9.0"
   val chillVersion = "0.6.0"
-
   val distDirectory = "output"
   val projectName = "gearpump"
 
@@ -56,7 +55,7 @@ object Build extends sbt.Build {
       ++ Pack.projects.toList).toSeq
 
 
-  val commonSettings = Seq(jacoco.settings:_*) ++ sonatypeSettings  ++ net.virtualvoid.sbt.graph.Plugin.graphSettings ++
+  val commonSettings = Seq(jacoco.settings:_*) ++ sonatypeSettings ++
     Seq(
         resolvers ++= Seq(
           "patriknw at bintray" at "http://dl.bintray.com/patriknw/maven",
@@ -69,8 +68,8 @@ object Build extends sbt.Build {
           "clockfly" at "http://dl.bintray.com/clockfly/maven",
           "vincent" at "http://dl.bintray.com/fvunicorn/maven",
           "clojars" at "http://clojars.org/repo"
-        ),
-        addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full)
+        )
+        // ,addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full)
     ) ++
     Seq(
       scalaVersion := scalaVersionNumber,
@@ -133,23 +132,19 @@ object Build extends sbt.Build {
 
   val daemonDependencies = Seq(
     libraryDependencies ++= Seq(
-      "com.typesafe.akka" %% "akka-contrib" % akkaVersion
-        exclude("com.typesafe.akka", "akka-persistence-experimental_2.11"),
       "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
-      "com.typesafe.akka" %% "akka-http-experimental" % "1.0",
-      "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0",
-      "com.typesafe.akka" %% "akka-stream-experimental" % "1.0",
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental"% "1.0",
+      "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
+      "com.typesafe.akka" %% "akka-http-spray-json-experimental"% akkaVersion,
       "commons-logging" % "commons-logging" % commonsLoggingVersion,
-      "com.github.patriknw" %% "akka-data-replication" % dataReplicationVersion,
+      "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
       "org.apache.hadoop" % "hadoop-common" % clouderaVersion  % "provided"
     )
   )
 
   val streamingDependencies = Seq(
     libraryDependencies ++= Seq(
-      "com.github.intel-hadoop" % "gearpump-shaded-gs-collections" % gsCollectionsVersion,
-      "com.typesafe.akka" %% "akka-stream-experimental" % "1.0"
+      "com.github.intel-hadoop" % "gearpump-shaded-gs-collections" % gsCollectionsVersion
     )
   )
 
@@ -160,9 +155,24 @@ object Build extends sbt.Build {
         "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
         "com.github.intel-hadoop" % "gearpump-shaded-guava" % guavaVersion,
         "commons-lang" % "commons-lang" % commonsLangVersion,
-        "com.typesafe.akka" %% "akka-actor" % akkaVersion,
-        "com.typesafe.akka" %% "akka-remote" % akkaVersion,
-        "com.typesafe.akka" %% "akka-agent" % akkaVersion,
+
+        /**
+         * Override Netty version 3.10.3.Final used by Akka 2.4.2 to work-around netty hang issue
+         * (https://github.com/gearpump/gearpump/issues/2020)
+         *
+         * Akka 2.4.2 by default use Netty 3.10.3.Final, which has a serious issue which can hang the
+         * network. The same issue also happens in version range (3.10.0.Final, 3.10.5.Final)
+         * Netty 3.10.6.Final have this issue fixed, however, we find there is a 20% performance drop.
+         * So we decided to downgrade netty to 3.8.0.Final (Same version used in akka 2.3.12).
+         *
+         * @see https://github.com/gearpump/gearpump/pull/2017 for more discussions.
+         */
+        "io.netty" % "netty" % "3.8.0.Final",
+        "com.typesafe.akka" %% "akka-remote" % akkaVersion
+          exclude("io.netty", "netty"),
+
+          "com.typesafe.akka" %% "akka-actor" % akkaVersion,
+          "com.typesafe.akka" %% "akka-agent" % akkaVersion,
         "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
         "com.typesafe.akka" %% "akka-kernel" % akkaVersion,
         "com.github.intel-hadoop" %% "gearpump-shaded-akka-kryo" % kryoVersion,
@@ -172,16 +182,12 @@ object Build extends sbt.Build {
         "org.mockito" % "mockito-core" % mockitoVersion % "test",
         "junit" % "junit" % junitVersion % "test"
       ),
-     libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _),
-     libraryDependencies ++= (
-        if (scalaVersion.value.startsWith("2.10")) List("org.scalamacros" %% "quasiquotes" % "2.1.0-M5")
-        else List("org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.3")
-      )
+     libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _)
   )
 
   lazy val javadocSettings = Seq(
-    addCompilerPlugin("org.spark-project" %% "genjavadoc-plugin" %
-      "0.9-spark0" cross CrossVersion.full),
+    addCompilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" %
+      "0.9" cross CrossVersion.full),
     scalacOptions += s"-P:genjavadoc:out=${target.value}/java"
   )
 
@@ -217,7 +223,7 @@ object Build extends sbt.Build {
     base = file("."),
     settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting
   ).aggregate(core, daemon, streaming,  services, external_kafka, external_monoid, external_serializer,
-      examples, storm, akkastream, yarn, external_hbase, packProject, external_hadoopfs, 
+      examples, storm, yarn, external_hbase, packProject, external_hadoopfs,
       integration_test).settings(Defaults.itSettings : _*)
 
   lazy val core = Project(
@@ -274,13 +280,14 @@ object Build extends sbt.Build {
 
   lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
     libraryDependencies ++= Seq(
-      "com.typesafe.akka" %% "akka-http-testkit-experimental"% "1.0" % "test",
+      "com.typesafe.akka" %% "akka-http-testkit"% akkaVersion % "test",
       "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
       "com.lihaoyi" %% "upickle" % upickleVersion,
       "com.softwaremill" %% "akka-http-session" % "0.1.4",
-      "com.typesafe.akka" %% "akka-http-spray-json-experimental"% "1.0",
+      "com.typesafe.akka" %% "akka-http-spray-json-experimental"% akkaVersion,
       "com.github.scribejava" % "scribejava-apis" % "2.4.0",
-      "com.ning" % "async-http-client" % "1.9.33",
+      "com.ning" % "async-http-client" % "1.9.33"
+        exclude("io.netty", "netty"),
       "org.webjars" % "angularjs" % "1.4.9",
       "org.webjars.npm" % "angular-touch" % "1.5.0", // angular 1.5 breaks ui-select, but we need ng-touch 1.5
       "org.webjars" % "angular-ui-router" % "0.2.15",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index a3852c4..8a5ca8d 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -69,9 +69,7 @@ object Pack extends sbt.Build {
             "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id)
           ),
           packExclude := Seq(thisProjectRef.value.project),
-          //This is a work-around for https://github.com/gearpump/gearpump/issues/1816
-          //Will be removed in the future when Akka release a new version which includes the fix.
-          packExcludeJars := Seq(s"akka-actor_${scalaBinaryVersion.value}-$akkaVersion.jar"),
+
           packResourceDir += (baseDirectory.value / ".." / "conf" -> "conf"),
           packResourceDir += (baseDirectory.value / ".." / "yarnconf" -> "conf/yarnconf"),
           packResourceDir += (baseDirectory.value / ".." / "unmanagedlibs" / scalaBinaryVersion.value -> "lib"),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/plugins.sbt
----------------------------------------------------------------------
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 58b0aa6..0477712 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -2,7 +2,7 @@ resolvers += Resolver.url("fvunicorn", url("http://dl.bintray.com/fvunicorn/sbt-
 
 resolvers += Classpaths.sbtPluginReleases
 
-addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.4")
+addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.8")
 
 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
 
@@ -10,8 +10,6 @@ addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.6")
 
 addSbtPlugin("de.johoop" % "jacoco4sbt" % "2.1.6")
 
-addSbtPlugin("com.gilt" % "sbt-dependency-graph-sugar" % "0.7.4")
-
 addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1")
 
 addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
@@ -22,4 +20,4 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.1.0")
 
 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0")
 
-addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
\ No newline at end of file
+addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
index ee70d6b..3a1c4fe 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
@@ -24,7 +24,6 @@ import akka.actor.ActorRef
 import akka.http.scaladsl.marshalling.Marshal
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers.{`Cache-Control`, `Set-Cookie`}
-import akka.stream.io.SynchronousFileSource
 import akka.stream.scaladsl.Source
 import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.TestProbe
@@ -44,6 +43,7 @@ import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
 import scala.concurrent.{Future, ExecutionContext}
 import scala.concurrent.duration._
 import scala.util.{Success, Try}
+import akka.stream.scaladsl.FileIO
 
 class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   Matchers with BeforeAndAfterAll {
@@ -168,7 +168,9 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   }
 
   private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-    val entity =  HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000))
+    val entity =  HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+      FileIO.fromFile(file, chunkSize = 100000))
+
     val body = Source.single(
       Multipart.FormData.BodyPart(
         "file",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
index e3504ad..c276286 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
@@ -61,21 +61,21 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       assert(header[`Set-Cookie`].isDefined)
       val httpCookie = header[`Set-Cookie`].get.cookie
       assert(httpCookie.name == "gearpump_token")
-      cookie = new HttpCookiePair(httpCookie.name, httpCookie.value)
+      cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
     }
 
     // after authentication, everything is fine.
-    Get("/resource") ~>  addHeader(Cookie(cookie)) ~> security.route ~> check {
+    Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       responseAs[String] shouldEqual "OK"
     }
 
     // however, guest cannot access high-permission operations, like POST.
-    Post("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check {
+      Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       assert(rejection == AuthorizationFailedRejection)
     }
 
     // logout, should clear the session
-    Post(s"/logout") ~> addHeader(Cookie(cookie)) ~> security.route ~> check{
+    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
       assert("{\"user\":\"guest\"}" == responseAs[String])
       assert(status.intValue() == 200)
       assert(header[`Set-Cookie`].isDefined)
@@ -106,21 +106,21 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       assert(header[`Set-Cookie`].isDefined)
       val httpCookie = header[`Set-Cookie`].get.cookie
       assert(httpCookie.name == "gearpump_token")
-      cookie = new HttpCookiePair(httpCookie.name, httpCookie.value)
+      cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
     }
 
     // after authentication, everything is fine.
-    Get("/resource") ~>  addHeader(Cookie(cookie)) ~> security.route ~> check {
+    Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       responseAs[String] shouldEqual "OK"
     }
 
     // Not like guest, admimn can also access POST
-    Post("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check {
+    Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       responseAs[String] shouldEqual "OK"
     }
 
     // logout, should clear the session
-    Post(s"/logout") ~> addHeader(Cookie(cookie)) ~> security.route ~> check{
+    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
       assert("{\"user\":\"admin\"}" == responseAs[String])
       assert(status.intValue() == 200)
       assert(header[`Set-Cookie`].isDefined)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
index 3e3e0b0..df996e7 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
@@ -19,9 +19,9 @@
 package io.gearpump.services.security.oauth2
 
 import akka.actor.ActorSystem
-import akka.http.javadsl.model.HttpEntityStrict
+import akka.http.scaladsl.model.HttpEntity.Strict
 import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.{Query, Path}
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import com.typesafe.config.ConfigFactory
@@ -54,7 +54,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
   uaa.init(configString)
 
   it should "generate the correct authorization request" in {
-    val parameters = Uri(uaa.getAuthorizationUrl()).query.toMap
+    val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap
     assert(parameters("response_type") == "code")
     assert(parameters("client_id") == configMap("clientid"))
     assert(parameters("redirect_uri") == configMap("callback"))
@@ -69,10 +69,10 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
 
     def accessTokenEndpoint(request: HttpRequest) = {
       assert(request.getHeader("Authorization").get.value() == "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
-      assert(request.entity.contentType().mediaType.value == "application/x-www-form-urlencoded")
+      assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
 
-      val body = request.entity.asInstanceOf[HttpEntityStrict].data().decodeString("UTF-8")
-      val form = Uri./.withQuery(body).query.toMap
+      val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+      val form = Uri./.withQuery(Query(body)).query().toMap
 
       assert(form("grant_type") == "authorization_code")
       assert(form("code") == "QGGVeA")
@@ -94,7 +94,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
     }
 
     def protectedResourceEndpoint(request: HttpRequest) = {
-      assert(request.getUri().parameter("access_token").get == accessToken)
+      assert(request.getUri().query().get("access_token").get == accessToken)
       val response =
         s"""
           |{

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
index 8fbe43f..ff57baf 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
@@ -19,9 +19,9 @@
 package io.gearpump.services.security.oauth2
 
 import akka.actor.ActorSystem
-import akka.http.javadsl.model.HttpEntityStrict
+import akka.http.scaladsl.model.HttpEntity.Strict
 import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.{Query, Path}
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import com.typesafe.config.ConfigFactory
@@ -55,7 +55,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
   google.init(configString)
 
   it should "generate the correct authorization request" in {
-    val parameters = Uri(google.getAuthorizationUrl()).query.toMap
+    val parameters = Uri(google.getAuthorizationUrl()).query().toMap
     assert(parameters("response_type") == "code")
     assert(parameters("client_id") == configMap("clientid"))
     assert(parameters("redirect_uri") == configMap("callback"))
@@ -70,10 +70,10 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
 
     def accessTokenEndpoint(request: HttpRequest) = {
 
-      assert(request.entity.contentType().mediaType.value == "application/x-www-form-urlencoded")
+      assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
 
-      val body = request.entity.asInstanceOf[HttpEntityStrict].data().decodeString("UTF-8")
-      val form = Uri./.withQuery(body).query.toMap
+      val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+      val form = Uri./.withQuery(Query(body)).query().toMap
 
       assert(form("client_id") == configMap("clientid"))
       assert(form("client_secret") == configMap("clientsecret"))
@@ -95,7 +95,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
     }
 
     def protectedResourceEndpoint(request: HttpRequest) = {
-      assert(request.getUri().parameter("access_token").get == accessToken)
+      assert(request.getUri().query().get("access_token").get == accessToken)
       val response =s"""
            |{
            |   "kind": "plus#person",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
index 88e58a8..2dbdd14 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
@@ -25,7 +25,7 @@ import io.gearpump.transport.{Express, HostPort}
 import io.gearpump.Message
 
 import scala.collection.mutable
-
+import io.gearpump.util.AkkaHelper
 /**
  * ExpressTransport wire the networking function from default akka
  * networking to customized implementation [[Express]].
@@ -43,7 +43,7 @@ trait ExpressTransport {
   lazy val sourceId = TaskId.toLong(taskId)
 
   lazy val sessionRef: ActorRef = {
-    system.actorFor(s"/session#$sessionId")
+    AkkaHelper.actorFor(system, s"/session#$sessionId")
   }
 
   def transport(msg : AnyRef, remotes : TaskId *): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
index cc2f52b..2750cc5 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -38,7 +38,7 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
   val dag = DAG(graph)
   implicit var system: ActorSystem = null
   val appId = 0
-  val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph)
+  lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph)
 
   "DagManager" should {
     import io.gearpump.streaming.appmaster.ClockServiceSpec.Store

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
index 70ce643..89d2d64 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -65,6 +65,7 @@ class StreamAppSpec  extends FlatSpec with Matchers with BeforeAndAfterAll  with
 
   it should "produce 3 messages" in {
     val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
     val app = StreamApp("dsl", context)
     val list = List[String](
       "0",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar
----------------------------------------------------------------------
diff --git a/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar b/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar
deleted file mode 100644
index a04c476..0000000
Binary files a/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar
----------------------------------------------------------------------
diff --git a/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar b/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar
deleted file mode 100644
index 5cfb4a6..0000000
Binary files a/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar and /dev/null differ