You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2018/06/25 13:47:42 UTC
incubator-gearpump git commit: [GEARPUMP-326] Upgrade Akka to 2.5.13
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 2334c19ee -> 6bef69cd7
[GEARPUMP-326] Upgrade Akka to 2.5.13
1. Upgrade Akka
2. Remove akkastream module temporarily
Author: manuzhang <ow...@gmail.com>
Closes #250 from manuzhang/upgrade_akka_2.5.13.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/6bef69cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/6bef69cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/6bef69cd
Branch: refs/heads/master
Commit: 6bef69cd71febb5f06327f528704850f0134687e
Parents: 2334c19
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jun 25 21:45:39 2018 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 25 21:46:24 2018 +0800
----------------------------------------------------------------------
.../gearpump/cluster/main/AppSubmitter.scala | 1 -
.../org/apache/gearpump/cluster/main/Gear.scala | 5 +---
.../cluster/master/InMemoryKVService.scala | 28 +++++++++++---------
.../apache/gearpump/jarstore/FileServer.scala | 9 ++++---
project/BuildExperiments.scala | 2 +-
project/Dependencies.scala | 7 +++--
project/Pack.scala | 12 ++++++---
7 files changed, 34 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
index defd86e..4a43d2d 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.net.{URL, URLClassLoader}
import java.util.jar.JarFile
-import org.apache.gearpump.cluster.client.RuntimeEnvironment
import org.apache.gearpump.util.{Constants, LogUtil, MasterClientCommand, Util}
import scala.util.{Failure, Success, Try}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
index 7d6181f..a835350 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
@@ -17,10 +17,7 @@
*/
package org.apache.gearpump.cluster.main
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.{RemoteRuntimeEnvironment, RuntimeEnvironment}
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{Constants, LogUtil}
+import org.apache.gearpump.util.Constants
object Gear {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
index fd19bad..9bcce6f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -40,15 +40,15 @@ class InMemoryKVService extends Actor with Stash {
private val LOG: Logger = LogUtil.getLogger(getClass)
private val replicator = DistributedData(context.system).replicator
- private implicit val cluster = Cluster(context.system)
+ private implicit val cluster: Cluster = Cluster(context.system)
// Optimize write path, we can tolerate one master down for recovery.
private val timeout = Duration(15, TimeUnit.SECONDS)
private val readMajority = ReadMajority(timeout)
private val writeMajority = WriteMajority(timeout)
- private def groupKey(group: String): LWWMapKey[Any] = {
- LWWMapKey[Any](KV_SERVICE + "_" + group)
+ private def groupKey(group: String): LWWMapKey[Any, Any] = {
+ LWWMapKey[Any, Any](KV_SERVICE + "_" + group)
}
def receive: Receive = kvService
@@ -58,14 +58,15 @@ class InMemoryKVService extends Actor with Stash {
case GetKV(group: String, key: String) =>
val request = Request(sender(), key)
replicator ! Get(groupKey(group), readMajority, Some(request))
- case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ case success@GetSuccess(group: LWWMapKey[Any @unchecked, Any @unchecked],
+ Some(request: Request)) =>
val appData = success.get(group)
LOG.info(s"Successfully retrived group: ${group.id}")
request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
- case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ case NotFound(group: LWWMapKey[Any @unchecked, Any @unchecked], Some(request: Request)) =>
LOG.info(s"We cannot find group $group")
request.client ! GetKVSuccess(request.key, null)
- case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ case GetFailure(_, Some(request: Request)) =>
val error = s"Failed to get application data, the request key is ${request.key}"
LOG.error(error)
request.client ! GetKVFailed(new Exception(error))
@@ -76,20 +77,21 @@ class InMemoryKVService extends Actor with Stash {
map + (key -> value)
}
replicator ! update
- case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ case UpdateSuccess(_, Some(request: Request)) =>
request.client ! PutKVSuccess
- case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
+ case ModifyFailure(_, error, cause,
+ Some(request: Request)) =>
request.client ! PutKVFailed(request.key, new Exception(error, cause))
- case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ case UpdateTimeout(_, Some(request: Request)) =>
request.client ! PutKVFailed(request.key, new TimeoutException())
- case delete@DeleteKVGroup(group: String) =>
+ case DeleteKVGroup(group: String) =>
replicator ! Delete(groupKey(group), writeMajority)
- case DeleteSuccess(group) =>
+ case DeleteSuccess(group, _) =>
LOG.info(s"KV Group ${group.id} is deleted")
- case ReplicationDeleteFailure(group) =>
+ case ReplicationDeleteFailure(group, _) =>
LOG.error(s"Failed to delete KV Group ${group.id}...")
- case DataDeleted(group) =>
+ case DataDeleted(group, _) =>
LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
index 8c1d19a..4bcc3a5 100644
--- a/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/FileServer.scala
@@ -18,8 +18,10 @@
package org.apache.gearpump.jarstore
import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
+import akka.Done
+
+import scala.concurrent.{ExecutionContext, Future}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
@@ -33,7 +35,6 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Sink, Source}
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat
-
import org.apache.gearpump.jarstore.FileDirective._
import org.apache.gearpump.jarstore.FileServer.Port
@@ -84,14 +85,14 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, jarStore: Jar
}
}
- private var connection: Future[ServerBinding] = null
+ private var connection: Future[ServerBinding] = _
def start: Future[Port] = {
connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
connection.map(address => Port(address.localAddress.getPort))
}
- def stop: Future[Unit] = {
+ def stop: Future[Done] = {
connection.flatMap(_.unbind())
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/BuildExperiments.scala
----------------------------------------------------------------------
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index 84c80f0..765bedd 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -25,7 +25,7 @@ import sbt.Keys._
object BuildExperiments extends sbt.Build {
lazy val experiments: Seq[ProjectReference] = Seq(
- akkastream,
+ // akkastream,
cgroup,
redis,
storm,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 06d2781..627138d 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -23,8 +23,8 @@ object Dependencies {
val crossScalaVersionNumbers = Seq("2.11.8")
val scalaVersionNumber = crossScalaVersionNumbers.last
- val akkaVersion = "2.4.16"
- val akkaHttpVersion = "10.0.1"
+ val akkaVersion = "2.5.13"
+ val akkaHttpVersion = "10.1.3"
val hadoopVersion = "2.6.0"
val hbaseVersion = "1.0.0"
val commonsHttpVersion = "3.1"
@@ -89,11 +89,10 @@ object Dependencies {
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
"commons-logging" % "commons-logging" % commonsLoggingVersion,
- "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
+ "com.typesafe.akka" %% "akka-distributed-data" % akkaVersion,
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-agent" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
- "com.typesafe.akka" %% "akka-kernel" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"org.scala-lang" % "scala-reflect" % scalaVersionNumber,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/6bef69cd/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 5e546b2..568d7f3 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -128,8 +128,8 @@ object Pack extends sbt.Build {
"lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id).
exclude(services.id, core.id),
"lib/services" -> new ProjectsToPack(services.id).exclude(core.id),
- "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id),
- "lib/akkastream" -> new ProjectsToPack(akkastream.id)
+ // "lib/akkastream" -> new ProjectsToPack(akkastream.id),
+ "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id)
),
packExclude := Seq(thisProjectRef.value.project),
@@ -164,6 +164,12 @@ object Pack extends sbt.Build {
packArchiveExcludes := Seq("integrationtest")
)
- ).dependsOn(core, streaming, services, yarn, storm, akkastream, cgroup).
+ ).dependsOn(core,
+ streaming,
+ services,
+ yarn,
+ storm,
+ // akkastream,
+ cgroup).
disablePlugins(sbtassembly.AssemblyPlugin)
}