You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2018/06/25 13:47:42 UTC

incubator-gearpump git commit: [GEARPUMP-326] Upgrade Akka to 2.5.13

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 2334c19ee -> 6bef69cd7


[GEARPUMP-326] Upgrade Akka to 2.5.13

1. Upgrade Akka
2. Remove akkastream module temporarily

Author: manuzhang <ow...@gmail.com>

Closes #250 from manuzhang/upgrade_akka_2.5.13.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/6bef69cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/6bef69cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/6bef69cd

Branch: refs/heads/master
Commit: 6bef69cd71febb5f06327f528704850f0134687e
Parents: 2334c19
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jun 25 21:45:39 2018 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 25 21:46:24 2018 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/main/AppSubmitter.scala    |  1 -
 .../org/apache/gearpump/cluster/main/Gear.scala |  5 +---
 .../cluster/master/InMemoryKVService.scala      | 28 +++++++++++---------
 .../apache/gearpump/jarstore/FileServer.scala   |  9 ++++---
 project/BuildExperiments.scala                  |  2 +-
 project/Dependencies.scala                      |  7 +++--
 project/Pack.scala                              | 12 ++++++---
 7 files changed, 34 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
index defd86e..4a43d2d 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -21,7 +21,6 @@ import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.util.jar.JarFile
 
-import org.apache.gearpump.cluster.client.RuntimeEnvironment
 import org.apache.gearpump.util.{Constants, LogUtil, MasterClientCommand, Util}
 
 import scala.util.{Failure, Success, Try}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
index 7d6181f..a835350 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
@@ -17,10 +17,7 @@
  */
 package org.apache.gearpump.cluster.main
 
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment}
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{Constants, LogUtil}
+import org.apache.gearpump.util.Constants
 
 object Gear {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
index fd19bad..9bcce6f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -40,15 +40,15 @@ class InMemoryKVService extends Actor with Stash {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
   private val replicator = DistributedData(context.system).replicator
-  private implicit val cluster = Cluster(context.system)
+  private implicit val cluster: Cluster = Cluster(context.system)
 
   // Optimize write path, we can tolerate one master down for recovery.
   private val timeout = Duration(15, TimeUnit.SECONDS)
   private val readMajority = ReadMajority(timeout)
   private val writeMajority = WriteMajority(timeout)
 
-  private def groupKey(group: String): LWWMapKey[Any] = {
-    LWWMapKey[Any](KV_SERVICE + "_" + group)
+  private def groupKey(group: String): LWWMapKey[Any, Any] = {
+    LWWMapKey[Any, Any](KV_SERVICE + "_" + group)
   }
 
   def receive: Receive = kvService
@@ -58,14 +58,15 @@ class InMemoryKVService extends Actor with Stash {
     case GetKV(group: String, key: String) =>
       val request = Request(sender(), key)
       replicator ! Get(groupKey(group), readMajority, Some(request))
-    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+    case success@GetSuccess(group: LWWMapKey[Any @unchecked, Any @unchecked],
+    Some(request: Request)) =>
       val appData = success.get(group)
       LOG.info(s"Successfully retrived group: ${group.id}")
       request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
-    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+    case NotFound(group: LWWMapKey[Any @unchecked, Any @unchecked], Some(request: Request)) =>
       LOG.info(s"We cannot find group $group")
       request.client ! GetKVSuccess(request.key, null)
-    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+    case GetFailure(_, Some(request: Request)) =>
       val error = s"Failed to get application data, the request key is ${request.key}"
       LOG.error(error)
       request.client ! GetKVFailed(new Exception(error))
@@ -76,20 +77,21 @@ class InMemoryKVService extends Actor with Stash {
         map + (key -> value)
       }
       replicator ! update
-    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+    case UpdateSuccess(_, Some(request: Request)) =>
       request.client ! PutKVSuccess
-    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
+    case ModifyFailure(_, error, cause,
+    Some(request: Request)) =>
       request.client ! PutKVFailed(request.key, new Exception(error, cause))
-    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+    case UpdateTimeout(_, Some(request: Request)) =>
       request.client ! PutKVFailed(request.key, new TimeoutException())
 
-    case delete@DeleteKVGroup(group: String) =>
+    case DeleteKVGroup(group: String) =>
       replicator ! Delete(groupKey(group), writeMajority)
-    case DeleteSuccess(group) =>
+    case DeleteSuccess(group, _) =>
       LOG.info(s"KV Group ${group.id} is deleted")
-    case ReplicationDeleteFailure(group) =>
+    case ReplicationDeleteFailure(group, _) =>
       LOG.error(s"Failed to delete KV Group ${group.id}...")
-    case DataDeleted(group) =>
+    case DataDeleted(group, _) =>
       LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
index 8c1d19a..4bcc3a5 100644
--- a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
@@ -18,8 +18,10 @@
 package org.apache.gearpump.jarstore
 
 import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
 
+import akka.Done
+
+import scala.concurrent.{ExecutionContext, Future}
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.Http.ServerBinding
@@ -33,7 +35,6 @@ import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{FileIO, Sink, Source}
 import spray.json.DefaultJsonProtocol._
 import spray.json.JsonFormat
-
 import org.apache.gearpump.jarstore.FileDirective._
 import org.apache.gearpump.jarstore.FileServer.Port
 
@@ -84,14 +85,14 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, jarStore: Jar
     }
   }
 
-  private var connection: Future[ServerBinding] = null
+  private var connection: Future[ServerBinding] = _
 
   def start: Future[Port] = {
     connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
     connection.map(address => Port(address.localAddress.getPort))
   }
 
-  def stop: Future[Unit] = {
+  def stop: Future[Done] = {
     connection.flatMap(_.unbind())
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/BuildExperiments.scala
----------------------------------------------------------------------
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index 84c80f0..765bedd 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -25,7 +25,7 @@ import sbt.Keys._
 object BuildExperiments extends sbt.Build {
 
   lazy val experiments: Seq[ProjectReference] = Seq(
-    akkastream,
+    // akkastream,
     cgroup,
     redis,
     storm,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 06d2781..627138d 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -23,8 +23,8 @@ object Dependencies {
 
   val crossScalaVersionNumbers = Seq("2.11.8")
   val scalaVersionNumber = crossScalaVersionNumbers.last
-  val akkaVersion = "2.4.16"
-  val akkaHttpVersion = "10.0.1"
+  val akkaVersion = "2.5.13"
+  val akkaHttpVersion = "10.1.3"
   val hadoopVersion = "2.6.0"
   val hbaseVersion = "1.0.0"
   val commonsHttpVersion = "3.1"
@@ -89,11 +89,10 @@ object Dependencies {
       "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
       "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
       "commons-logging" % "commons-logging" % commonsLoggingVersion,
-      "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
+      "com.typesafe.akka" %% "akka-distributed-data" % akkaVersion,
       "com.typesafe.akka" %% "akka-actor" % akkaVersion,
       "com.typesafe.akka" %% "akka-agent" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
-      "com.typesafe.akka" %% "akka-kernel" % akkaVersion,
       "com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
       "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
       "org.scala-lang" % "scala-reflect" % scalaVersionNumber,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 5e546b2..568d7f3 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -128,8 +128,8 @@ object Pack extends sbt.Build {
           "lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id).
             exclude(services.id, core.id),
           "lib/services" -> new ProjectsToPack(services.id).exclude(core.id),
-          "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id),
-          "lib/akkastream" -> new ProjectsToPack(akkastream.id)
+          // "lib/akkastream" -> new ProjectsToPack(akkastream.id),
+          "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id)
         ),
         packExclude := Seq(thisProjectRef.value.project),
 
@@ -164,6 +164,12 @@ object Pack extends sbt.Build {
         packArchiveExcludes := Seq("integrationtest")
 
       )
-  ).dependsOn(core, streaming, services, yarn, storm, akkastream, cgroup).
+  ).dependsOn(core,
+    streaming,
+    services,
+    yarn,
+    storm,
+    // akkastream,
+    cgroup).
     disablePlugins(sbtassembly.AssemblyPlugin)
 }