You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:19 UTC
[09/49] incubator-gearpump git commit: fix #1988,
upgrade akka to akka 2.4.2
fix #1988, upgrade akka to akka 2.4.2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/21d59216
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/21d59216
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/21d59216
Branch: refs/heads/master
Commit: 21d59216b181186e322738cef751e3bc7abc6a81
Parents: bcdbfc9
Author: Sean Zhong <cl...@gmail.com>
Authored: Fri Mar 25 00:33:23 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:23:48 2016 +0800
----------------------------------------------------------------------
.travis.yml | 4 +-
conf/gear.conf | 2 +-
.../main/java/io/gearpump/util/AkkaHelper.java | 35 +++++++
.../io/gearpump/util/HadoopFSLogAppender.java | 18 ++++
core/src/main/resources/geardefault.conf | 9 ++
.../io/gearpump/transport/netty/Server.scala | 6 +-
.../main/scala/io/gearpump/util/ActorUtil.scala | 1 +
.../io/gearpump/cluster/DaemonMessage.scala | 2 -
.../scala/io/gearpump/cluster/main/Master.scala | 35 ++++---
.../io/gearpump/cluster/master/AppManager.scala | 2 +-
.../cluster/master/ClusterReplication.scala | 64 -----------
.../cluster/master/InMemoryKVService.scala | 105 +++++++++++--------
.../io/gearpump/cluster/master/Master.scala | 6 +-
.../scala/io/gearpump/util/FileDirective.scala | 12 ++-
.../scala/io/gearpump/util/FileServer.scala | 15 ++-
.../cluster/main/MasterWatcherSpec.scala | 3 +-
.../cluster/master/InMemoryKVServiceSpec.scala | 7 +-
.../yarn/appmaster/YarnAppMaster.scala | 4 +-
.../yarn/client/AppMasterResolver.scala | 4 +-
.../checklist/MessageDeliverySpec.scala | 2 +-
project/Build.scala | 65 +++++++-----
project/Pack.scala | 4 +-
project/plugins.sbt | 6 +-
.../gearpump/services/MasterServiceSpec.scala | 6 +-
.../gearpump/services/SecurityServiceSpec.scala | 16 +--
...CloudFoundryUAAOAuth2AuthenticatorSpec.scala | 14 +--
.../oauth2/GoogleOAuth2AuthenticatorSpec.scala | 14 +--
.../streaming/task/ExpressTransport.scala | 4 +-
.../streaming/appmaster/DagManagerSpec.scala | 2 +-
.../gearpump/streaming/dsl/StreamAppSpec.scala | 1 +
.../2.10/akka-actor_2.10-2.3.12-fix-1816.jar | Bin 2815968 -> 0 bytes
.../2.11/akka-actor_2.11-2.3.12-fix-1816.jar | Bin 2790862 -> 0 bytes
32 files changed, 248 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 771f652..5b6b80f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@ script:
then sbt -jvm-opts project/travis/jvmopts clean +assembly +packArchiveZip | grep -v -E "$skipLogs";
fi
jdk:
-- oraclejdk7
+- oraclejdk8
scala:
-- 2.11.5
+- 2.11.8
cache:
directories:
- $HOME/.m2
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index 1ab4bb6..d8e8b4c 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -320,7 +320,7 @@ gearpump {
### Configuration only visible to master nodes..
gearpump-master {
extensions = [
- "akka.contrib.datareplication.DataReplication$"
+ "akka.cluster.ddata.DistributedData$"
]
akka {
#########################################
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/java/io/gearpump/util/AkkaHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/io/gearpump/util/AkkaHelper.java b/core/src/main/java/io/gearpump/util/AkkaHelper.java
new file mode 100644
index 0000000..f4772c1
--- /dev/null
+++ b/core/src/main/java/io/gearpump/util/AkkaHelper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.util;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+public class AkkaHelper {
+
+ /**
+ * Helper util to access the private[akka] system.actorFor method
+ *
+ * This is used for performance optimization, we encode the session Id
+ * in the ActorRef path. Session Id is used to identity sender Task.
+ */
+ public static ActorRef actorFor(ActorSystem system, String path) {
+ return system.actorFor(path);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
index a736f9c..ab7ee5e 100644
--- a/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
+++ b/core/src/main/java/io/gearpump/util/HadoopFSLogAppender.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.gearpump.util;
import org.apache.log4j.RollingFileAppender;
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf
index 17626a3..394a928 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -96,6 +96,7 @@ gearpump {
### If you want to use metrics, please change
###########################
+
### Flag to enable metrics
metrics {
enabled = false
@@ -572,6 +573,14 @@ akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
+ ## Doesn't warn on Java serializer usage
+ ##
+ ## Most of our streaming message are using custom serializer, with a few
+ ## exception on system control message. The volume of system control
+ ## message should be small. So, turn this flag off until further benchmark
+ ## shows a different result.
+ warn-about-java-serializer-usage = false
+
## TODO: in integration test, may need to enable this
##creation-timeout=100s
default-mailbox {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/scala/io/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
index c592b31..dde0861 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
@@ -22,7 +22,7 @@ import java.util
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
import io.gearpump.transport.ActorLookupById
-import io.gearpump.util.LogUtil
+import io.gearpump.util.{LogUtil, AkkaHelper}
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
import org.slf4j.Logger
@@ -116,7 +116,9 @@ object Server {
def translateToActorRef(sessionId : Int): ActorRef = {
if(!taskIdtoActorRef.contains(sessionId)){
- val actorRef = context.system.actorFor(s"/session#$sessionId")
+
+ // A fake ActorRef for performance optimization.
+ val actorRef = AkkaHelper.actorFor(context.system, s"/session#$sessionId")
taskIdtoActorRef += sessionId -> actorRef
}
taskIdtoActorRef.get(sessionId).get
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
index b63733d..8233b28 100644
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
@@ -87,6 +87,7 @@ object ActorUtil {
def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: ExecutorSystemJvmConfig,
sender: ActorRef)(implicit executor : scala.concurrent.ExecutionContext) = {
implicit val timeout = Constants.FUTURE_TIMEOUT
+
(master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
val resources = list.workers.map {
workerId => ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
index 36d84c0..19ac620 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
@@ -20,8 +20,6 @@ package io.gearpump.cluster
import akka.actor.ActorRef
import io.gearpump.cluster.master.Master.MasterInfo
import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Resource
/**
* Cluster Bootup Flow
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
index c89ab1e..8d4515a 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala
@@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit
import akka.actor._
import akka.cluster.ClusterEvent._
+import akka.cluster.ddata.DistributedData
import akka.cluster.{Cluster, Member, MemberStatus}
-import akka.contrib.datareplication.DataReplication
-import akka.contrib.pattern.{ClusterSingletonManager, ClusterSingletonProxy}
+import akka.cluster.singleton.{ClusterSingletonManagerSettings, ClusterSingletonProxySettings, ClusterSingletonManager, ClusterSingletonProxy}
import com.typesafe.config.ConfigValueFactory
import io.gearpump.cluster.ClusterConfig
import io.gearpump.cluster.master.{Master => MasterActor}
@@ -85,26 +85,28 @@ object Master extends AkkaApp with ArgumentsParser {
LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
val system = ActorSystem(MASTER, masterConfig)
- val replicator = DataReplication(system).replicator
+ val replicator = DistributedData(system).replicator
LOG.info(s"Replicator path: ${replicator.path}")
- //start master proxy
- val masterProxy = system.actorOf(ClusterSingletonProxy.props(
- singletonPath = s"/user/${SINGLETON_MANAGER}/${MASTER_WATCHER}/${MASTER}",
- role = Some(MASTER)),
- name = MASTER)
-
//start singleton manager
val singletonManager = system.actorOf(ClusterSingletonManager.props(
- singletonProps = Props(classOf[MasterWatcher], MASTER, masterProxy),
- singletonName = MASTER_WATCHER,
+ singletonProps = Props(classOf[MasterWatcher], MASTER),
terminationMessage = PoisonPill,
- role = Some(MASTER)),
+ settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER).withRole(MASTER)),
name = SINGLETON_MANAGER)
+ //start master proxy
+ val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+ singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+ // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
+ // Master will only be created when there is a majority of machines started.
+ settings = ClusterSingletonProxySettings(system).withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+ name = MASTER
+ )
+
LOG.info(s"master proxy is started at ${masterProxy.path}")
- val mainThread = Thread.currentThread();
+ val mainThread = Thread.currentThread()
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() : Unit = {
if (!system.isTerminated) {
@@ -120,16 +122,16 @@ object Master extends AkkaApp with ArgumentsParser {
case ex : Exception => //ignore
}
system.shutdown()
- mainThread.join();
+ mainThread.join()
}
}
- });
+ })
system.awaitTermination()
}
}
-class MasterWatcher(role: String, masterProxy : ActorRef) extends Actor with ActorLogging {
+class MasterWatcher(role: String) extends Actor with ActorLogging {
import context.dispatcher
val cluster = Cluster(context.system)
@@ -192,7 +194,6 @@ class MasterWatcher(role: String, masterProxy : ActorRef) extends Actor with Ac
def waitForShutdown : Receive = {
case MasterWatcher.Shutdown => {
- context.system.stop(masterProxy)
cluster.unsubscribe(self)
cluster.leave(cluster.selfAddress)
context.stop(self)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
index 06ab8bb..e6bd1db 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala
@@ -241,7 +241,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
(kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
case PutKVSuccess =>
client ! AppDataSaved
- case PutKVFailed(k, v, ex) =>
+ case PutKVFailed(k, ex) =>
client ! SaveAppDataFailed
}
case GetAppData(appId, key) =>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala b/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala
deleted file mode 100644
index faeccb3..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/master/ClusterReplication.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import akka.cluster.Cluster
-import akka.contrib.datareplication.Replicator._
-import akka.contrib.datareplication.{DataReplication, GSet}
-import io.gearpump.util.Constants
-import io.gearpump.util.Constants._
-import io.gearpump.util._
-import org.slf4j.Logger
-
-import scala.concurrent.duration.Duration
-
-/**
- * ClusterReplication use [[DataReplication]] to store replicated state.
- */
-trait ClusterReplication extends Actor with Stash {
-
- val LOG: Logger = LogUtil.getLogger(getClass)
- val systemconfig = context.system.settings.config
-
- implicit val executionContext = context.dispatcher
- implicit val cluster = Cluster(context.system)
-
- val TIMEOUT = Duration(5, TimeUnit.SECONDS)
- val STATE = "masterstate"
- val KVService = "kvService"
- implicit val timeout = Constants.FUTURE_TIMEOUT
-
- val replicator = DataReplication(context.system).replicator
-
- val masterClusterSize = Math.max(1, systemconfig.getStringList(GEARPUMP_CLUSTER_MASTERS).size())
-
- //optimize write path, we can tolerate one master down for recovery.
- val writeQuorum = Math.min(2, masterClusterSize / 2 + 1)
- val readQuorum = masterClusterSize + 1 - writeQuorum
-
- def stateChangeListener : Receive = {
- case update: UpdateResponse =>
- LOG.debug(s"we get update $update")
- case Changed(STATE, data: GSet) =>
- LOG.info("master state updated ")
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
index e31ed89..fb66a0c 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala
@@ -18,65 +18,78 @@
package io.gearpump.cluster.master
-import akka.actor._
-import akka.contrib.datareplication.LWWMap
-import akka.contrib.datareplication.Replicator._
-import akka.pattern.ask
+import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.{DistributedData, LWWMap, Key, LWWMapKey}
+import akka.cluster.ddata.Replicator._
+import io.gearpump.util.{LogUtil}
+import org.slf4j.Logger
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
/**
* A replicated simple in-memory KV service.
*/
-class InMemoryKVService extends Actor with Stash with ClusterReplication {
+class InMemoryKVService extends Actor with Stash {
import InMemoryKVService._
- def receive : Receive = kvService orElse stateChangeListener
+ private val KV_SERVICE = "gearpump_kvservice"
- override def preStart(): Unit = {
- replicator ! Subscribe(STATE, self)
- }
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val replicator = DistributedData(context.system).replicator
+ private implicit val cluster = Cluster(context.system)
+
+ //optimize write path, we can tolerate one master down for recovery.
+ private val timeout = Duration(15, TimeUnit.SECONDS)
+ private val readMajority = ReadMajority(timeout)
+ private val writeMajority = WriteMajority(timeout)
- override def postStop(): Unit = {
- replicator ! Unsubscribe(STATE, self)
+ private def groupKey(group: String): LWWMapKey[Any] = {
+ LWWMapKey[Any](KV_SERVICE + "_" + group)
}
+
+ def receive : Receive = kvService
+
def kvService : Receive = {
+
case GetKV(group: String, key : String) =>
- val client = sender
- (replicator ? new Get(KVService + group, ReadFrom(readQuorum), TIMEOUT, None)).asInstanceOf[Future[GetResponse]].map {
- case GetSuccess(_, appData: LWWMap, _) =>
- LOG.info(s"Successfully retrived group: $group")
- client ! GetKVSuccess(key, appData.get(key).orNull)
- case x: NotFound =>
- LOG.info(s"We cannot find group $group")
- client ! GetKVSuccess(key, null)
- case x : GetFailure =>
- LOG.error(s"Failed to get application $key data, the request key is $key")
- client ! GetKVFailed(new Exception(x.getClass.getName))
- case GetSuccess(_, x, _) =>
- LOG.error(s"Got unexpected response when get key $key, the response is $x")
- client ! GetKVFailed(new Exception(x.getClass.getName))
- }
+ val request = Request(sender(), key)
+ replicator ! Get(groupKey(group), readMajority, Some(request))
+ case success@ GetSuccess(group: LWWMapKey[Any], Some(request: Request)) =>
+ val appData = success.get(group)
+ LOG.info(s"Successfully retrived group: ${group.id}")
+ request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
+ case NotFound(group: LWWMapKey[Any], Some(request: Request)) =>
+ LOG.info(s"We cannot find group $group")
+ request.client ! GetKVSuccess(request.key, null)
+ case GetFailure(group: LWWMapKey[Any], Some(request: Request)) =>
+ val error = s"Failed to get application data, the request key is ${request.key}"
+ LOG.error(error)
+ request.client ! GetKVFailed(new Exception(error))
case PutKV(group: String, key: String, value: Any) =>
- val client = sender
-
- val update = Update(KVService + group, LWWMap(),
- WriteTo(writeQuorum), TIMEOUT) {map =>
+ val request = Request(sender(), key)
+ val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) {map =>
map + (key -> value)
}
-
- val putFuture = (replicator ? update).asInstanceOf[Future[UpdateResponse]]
-
- putFuture.map {
- case UpdateSuccess(key, _) =>
- client ! PutKVSuccess
- case fail: UpdateFailure =>
- client ! PutKVFailed(key, value, new Exception(fail.getClass.getName))
- }
- case DeleteKVGroup(group: String) =>
- val client = sender
- replicator ? Update(KVService + group, LWWMap(), WriteTo(writeQuorum), TIMEOUT)( _ => LWWMap())
+ replicator ! update
+ case UpdateSuccess(group: LWWMapKey[Any], Some(request: Request)) =>
+ request.client ! PutKVSuccess
+ case ModifyFailure(group: LWWMapKey[Any], error, cause, Some(request: Request)) =>
+ request.client ! PutKVFailed(request.key, new Exception(error, cause))
+ case UpdateTimeout(group: LWWMapKey[Any], Some(request: Request)) =>
+ request.client ! PutKVFailed(request.key, new TimeoutException())
+
+ case delete@ DeleteKVGroup(group: String) =>
+ replicator ! Delete(groupKey(group), writeMajority)
+ case DeleteSuccess(group) =>
+ LOG.info(s"KV Group ${group.id} is deleted")
+ case ReplicationDeleteFailure(group) =>
+ LOG.error(s"Failed to delete KV Group ${group.id}...")
+ case DataDeleted(group) =>
+ LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
}
}
@@ -96,9 +109,13 @@ object InMemoryKVService {
case class DeleteKVGroup(group: String)
+ case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
trait PutKVResult
case object PutKVSuccess extends PutKVResult
- case class PutKVFailed(key: String, value: Any, ex: Throwable) extends PutKVResult
+ case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+ case class Request(client: ActorRef, key: String)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
index a4d9b54..f22a300 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala
@@ -156,8 +156,8 @@ private[cluster] class Master extends Actor with Stash {
def kvServiceMsgHandler: Receive = {
case PutKVSuccess =>
//Skip
- case PutKVFailed(key, value, exception) =>
- LOG.error(s"Put value $value with key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception))
+ case PutKVFailed(key, exception) =>
+ LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception))
}
def metricsService : Receive = {
@@ -286,7 +286,7 @@ private[cluster] class Master extends Actor with Stash {
}
object Master {
- final val MASTER_GROUP = "-1"
+ final val MASTER_GROUP = "master_group"
final val WORKER_ID = "next_worker_id"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
index b4ff5b3..f4f82fb 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
@@ -25,7 +25,7 @@ import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.stream.Materializer
-import akka.stream.io.{SynchronousFileSink, SynchronousFileSource}
+import akka.stream.scaladsl.FileIO
import scala.concurrent.{ExecutionContext, Future}
@@ -75,6 +75,8 @@ object FileDirective {
ctx => {
filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
}
+
+
}
}
}
@@ -88,7 +90,7 @@ object FileDirective {
val responseEntity = HttpEntity(
MediaTypes.`application/octet-stream`,
file.length,
- SynchronousFileSource(file, CHUNK_SIZE))
+ FileIO.fromFile(file, CHUNK_SIZE))
complete(responseEntity)
}
@@ -100,10 +102,10 @@ object FileDirective {
//reserve the suffix
val targetPath = File.createTempFile(s"userfile_${p.name}_", s"${p.filename.getOrElse("")}", rootDirectory)
- val written = p.entity.dataBytes.runWith(SynchronousFileSink(targetPath))
+ val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
written.map(written =>
- if (written > 0) {
- Map(p.name -> FileInfo(p.filename.get, targetPath, written))
+ if (written.count > 0) {
+ Map(p.name -> FileInfo(p.filename.get, targetPath, written.count))
} else {
Map.empty[Name, FileInfo]
})
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/main/scala/io/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
index f208465..361c01d 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
@@ -24,15 +24,14 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.{Query, Path}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
-import akka.stream.io.{SynchronousFileSink, SynchronousFileSource}
import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
-import akka.stream.scaladsl.{Sink, Source}
+import akka.stream.scaladsl.{Sink, Source, FileIO}
import io.gearpump.jarstore.FilePath
import io.gearpump.util.FileDirective._
import io.gearpump.util.FileServer.Port
@@ -65,7 +64,7 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory
pathEndOrSingleSlash {
extractUri { uri =>
val upload = uri.withPath(Uri.Path("/upload")).toString()
- val entity = HttpEntity(MediaTypes.`text/html`,
+ val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
s"""
|
|<h2>Please specify a file to upload:</h2>
@@ -123,7 +122,7 @@ object FileServer {
HttpRequest(HttpMethods.POST, uri = target, entity = entity)
}
- val response = Source(request).via(httpClient).runWith(Sink.head)
+ val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
response.flatMap{some =>
Unmarshal(some).to[String]
}.map{path =>
@@ -132,17 +131,17 @@ object FileServer {
}
def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
- val downoad = server.withPath(Path("/download")).withQuery("file" -> remoteFile.path)
+ val downoad = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
//download file to local
val response = Source.single(HttpRequest(uri = downoad)).via(httpClient).runWith(Sink.head)
val downloaded = response.flatMap { response =>
- response.entity.dataBytes.runWith(SynchronousFileSink(saveAs))
+ response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
}
downloaded.map(written => Unit)
}
private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
- val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000))
+ val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromFile(file, chunkSize = 100000))
val body = Source.single(
Multipart.FormData.BodyPart(
"uploadfile",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
index 863d70c..3927993 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -34,9 +34,8 @@ class MasterWatcherSpec extends FlatSpec with Matchers {
val system = ActorSystem("ForMasterWatcher", config)
val actorWatcher = TestProbe()(system)
- val mockMaster = TestProbe()(system)
- val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher", mockMaster.ref))
+ val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
actorWatcher watch masterWatcher
actorWatcher.expectTerminated(masterWatcher, 5 seconds)
system.shutdown()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
index 76bfd54..8f60d34 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
@@ -23,6 +23,7 @@ import akka.testkit.TestProbe
import io.gearpump.cluster.master.InMemoryKVService._
import io.gearpump.cluster.{MasterHarness, TestUtil}
import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import scala.concurrent.duration._
class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
@@ -54,7 +55,11 @@ class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEa
client.send(kvService, DeleteKVGroup(group))
+ // after DeleteGroup, it no longer accept Get and Put
client.send(kvService, GetKV(group, "key"))
- client.expectMsg(GetKVSuccess("key", null))
+ client.expectNoMsg(3 seconds)
+
+ client.send(kvService, PutKV(group, "key", 3))
+ client.expectNoMsg(3 seconds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index 6cc0e33..f8982ce 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -31,7 +31,7 @@ import io.gearpump.experiments.yarn.Constants._
import io.gearpump.experiments.yarn.glue.Records._
import io.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
import io.gearpump.transport.HostPort
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+import io.gearpump.util._
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.methods.GetMethod
import org.slf4j.Logger
@@ -359,7 +359,7 @@ object YarnAppMaster extends AkkaApp with ArgumentsParser {
}
if (status == 200) {
- system.actorFor(get.getResponseBodyAsString)
+ AkkaHelper.actorFor(system, get.getResponseBodyAsString)
} else {
throw new IOException("Fail to resolve AppMaster address, please make sure " +
s"${report.getOriginalTrackingUrl} is accessible...")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
index adf1716..9f30570 100644
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -23,7 +23,7 @@ import java.io.IOException
import akka.actor.{ActorRef, ActorSystem}
import io.gearpump.experiments.yarn.glue.Records.ApplicationId
import io.gearpump.experiments.yarn.glue.YarnClient
-import io.gearpump.util.LogUtil
+import io.gearpump.util.{AkkaHelper, LogUtil}
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.methods.GetMethod
import org.slf4j.Logger
@@ -52,7 +52,7 @@ class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
if (status == 200) {
val response = get.getResponseBodyAsString
LOG.info("Successfully resolved AppMaster address: " + response)
- system.actorFor(response)
+ AkkaHelper.actorFor(system, response)
} else {
throw new IOException("Fail to resolve AppMaster address, please make sure " +
s"${report.getOriginalTrackingUrl} is accessible...")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
index 3f029ef..a9fdee5 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -44,7 +44,7 @@ class MessageDeliverySpec extends TestSpecBase {
val appId = restClient.getNextAvailableAppId()
val stateJar = cluster.queryBuiltInExampleJars("state-").head
- val success = restClient.submitApp(stateJar, args)
+ val success = restClient.submitApp(stateJar, executorNum = 1, args = args)
success shouldBe true
// verify #1
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 0d08718..761c3b9 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -19,7 +19,7 @@ object Build extends sbt.Build {
val copySharedSourceFiles = TaskKey[Unit]("copied shared services source code")
- val akkaVersion = "2.3.12"
+ val akkaVersion = "2.4.2"
val kryoVersion = "0.3.2"
val clouderaVersion = "2.6.0-cdh5.4.2"
val clouderaHBaseVersion = "1.0.0-cdh5.4.2"
@@ -37,7 +37,7 @@ object Build extends sbt.Build {
val slf4jVersion = "1.7.7"
val gsCollectionsVersion = "6.2.0"
- val crossScalaVersionNumbers = Seq("2.10.5", "2.11.5")
+ val crossScalaVersionNumbers = Seq("2.11.8")
val scalaVersionNumber = crossScalaVersionNumbers.last
val sprayVersion = "1.3.2"
val sprayJsonVersion = "1.3.1"
@@ -48,7 +48,6 @@ object Build extends sbt.Build {
val scalazVersion = "7.1.1"
val algebirdVersion = "0.9.0"
val chillVersion = "0.6.0"
-
val distDirectory = "output"
val projectName = "gearpump"
@@ -56,7 +55,7 @@ object Build extends sbt.Build {
++ Pack.projects.toList).toSeq
- val commonSettings = Seq(jacoco.settings:_*) ++ sonatypeSettings ++ net.virtualvoid.sbt.graph.Plugin.graphSettings ++
+ val commonSettings = Seq(jacoco.settings:_*) ++ sonatypeSettings ++
Seq(
resolvers ++= Seq(
"patriknw at bintray" at "http://dl.bintray.com/patriknw/maven",
@@ -69,8 +68,8 @@ object Build extends sbt.Build {
"clockfly" at "http://dl.bintray.com/clockfly/maven",
"vincent" at "http://dl.bintray.com/fvunicorn/maven",
"clojars" at "http://clojars.org/repo"
- ),
- addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full)
+ )
+ // ,addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full)
) ++
Seq(
scalaVersion := scalaVersionNumber,
@@ -133,23 +132,19 @@ object Build extends sbt.Build {
val daemonDependencies = Seq(
libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-contrib" % akkaVersion
- exclude("com.typesafe.akka", "akka-persistence-experimental_2.11"),
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
- "com.typesafe.akka" %% "akka-http-experimental" % "1.0",
- "com.typesafe.akka" %% "akka-http-core-experimental" % "1.0",
- "com.typesafe.akka" %% "akka-stream-experimental" % "1.0",
- "com.typesafe.akka" %% "akka-http-spray-json-experimental"% "1.0",
+ "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
+ "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion,
+ "com.typesafe.akka" %% "akka-http-spray-json-experimental"% akkaVersion,
"commons-logging" % "commons-logging" % commonsLoggingVersion,
- "com.github.patriknw" %% "akka-data-replication" % dataReplicationVersion,
+ "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
"org.apache.hadoop" % "hadoop-common" % clouderaVersion % "provided"
)
)
val streamingDependencies = Seq(
libraryDependencies ++= Seq(
- "com.github.intel-hadoop" % "gearpump-shaded-gs-collections" % gsCollectionsVersion,
- "com.typesafe.akka" %% "akka-stream-experimental" % "1.0"
+ "com.github.intel-hadoop" % "gearpump-shaded-gs-collections" % gsCollectionsVersion
)
)
@@ -160,9 +155,24 @@ object Build extends sbt.Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.github.intel-hadoop" % "gearpump-shaded-guava" % guavaVersion,
"commons-lang" % "commons-lang" % commonsLangVersion,
- "com.typesafe.akka" %% "akka-actor" % akkaVersion,
- "com.typesafe.akka" %% "akka-remote" % akkaVersion,
- "com.typesafe.akka" %% "akka-agent" % akkaVersion,
+
+ /**
+ * Override Netty version 3.10.3.Final used by Akka 2.4.2 to work-around netty hang issue
+ * (https://github.com/gearpump/gearpump/issues/2020)
+ *
+ * Akka 2.4.2 by default use Netty 3.10.3.Final, which has a serious issue which can hang the
+ * network. The same issue also happens in version range (3.10.0.Final, 3.10.5.Final)
+ * Netty 3.10.6.Final have this issue fixed, however, we find there is a 20% performance drop.
+ * So we decided to downgrade netty to 3.8.0.Final (Same version used in akka 2.3.12).
+ *
+ * @see https://github.com/gearpump/gearpump/pull/2017 for more discussions.
+ */
+ "io.netty" % "netty" % "3.8.0.Final",
+ "com.typesafe.akka" %% "akka-remote" % akkaVersion
+ exclude("io.netty", "netty"),
+
+ "com.typesafe.akka" %% "akka-actor" % akkaVersion,
+ "com.typesafe.akka" %% "akka-agent" % akkaVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-kernel" % akkaVersion,
"com.github.intel-hadoop" %% "gearpump-shaded-akka-kryo" % kryoVersion,
@@ -172,16 +182,12 @@ object Build extends sbt.Build {
"org.mockito" % "mockito-core" % mockitoVersion % "test",
"junit" % "junit" % junitVersion % "test"
),
- libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _),
- libraryDependencies ++= (
- if (scalaVersion.value.startsWith("2.10")) List("org.scalamacros" %% "quasiquotes" % "2.1.0-M5")
- else List("org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.3")
- )
+ libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _)
)
lazy val javadocSettings = Seq(
- addCompilerPlugin("org.spark-project" %% "genjavadoc-plugin" %
- "0.9-spark0" cross CrossVersion.full),
+ addCompilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" %
+ "0.9" cross CrossVersion.full),
scalacOptions += s"-P:genjavadoc:out=${target.value}/java"
)
@@ -217,7 +223,7 @@ object Build extends sbt.Build {
base = file("."),
settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting
).aggregate(core, daemon, streaming, services, external_kafka, external_monoid, external_serializer,
- examples, storm, akkastream, yarn, external_hbase, packProject, external_hadoopfs,
+ examples, storm, yarn, external_hbase, packProject, external_hadoopfs,
integration_test).settings(Defaults.itSettings : _*)
lazy val core = Project(
@@ -274,13 +280,14 @@ object Build extends sbt.Build {
lazy val serviceJvmSettings = commonSettings ++ noPublish ++ Seq(
libraryDependencies ++= Seq(
- "com.typesafe.akka" %% "akka-http-testkit-experimental"% "1.0" % "test",
+ "com.typesafe.akka" %% "akka-http-testkit"% akkaVersion % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"com.lihaoyi" %% "upickle" % upickleVersion,
"com.softwaremill" %% "akka-http-session" % "0.1.4",
- "com.typesafe.akka" %% "akka-http-spray-json-experimental"% "1.0",
+ "com.typesafe.akka" %% "akka-http-spray-json-experimental"% akkaVersion,
"com.github.scribejava" % "scribejava-apis" % "2.4.0",
- "com.ning" % "async-http-client" % "1.9.33",
+ "com.ning" % "async-http-client" % "1.9.33"
+ exclude("io.netty", "netty"),
"org.webjars" % "angularjs" % "1.4.9",
"org.webjars.npm" % "angular-touch" % "1.5.0", // angular 1.5 breaks ui-select, but we need ng-touch 1.5
"org.webjars" % "angular-ui-router" % "0.2.15",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index a3852c4..8a5ca8d 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -69,9 +69,7 @@ object Pack extends sbt.Build {
"lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id)
),
packExclude := Seq(thisProjectRef.value.project),
- //This is a work-around for https://github.com/gearpump/gearpump/issues/1816
- //Will be removed in the future when Akka release a new version which includes the fix.
- packExcludeJars := Seq(s"akka-actor_${scalaBinaryVersion.value}-$akkaVersion.jar"),
+
packResourceDir += (baseDirectory.value / ".." / "conf" -> "conf"),
packResourceDir += (baseDirectory.value / ".." / "yarnconf" -> "conf/yarnconf"),
packResourceDir += (baseDirectory.value / ".." / "unmanagedlibs" / scalaBinaryVersion.value -> "lib"),
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/project/plugins.sbt
----------------------------------------------------------------------
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 58b0aa6..0477712 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -2,7 +2,7 @@ resolvers += Resolver.url("fvunicorn", url("http://dl.bintray.com/fvunicorn/sbt-
resolvers += Classpaths.sbtPluginReleases
-addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.4")
+addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.8")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
@@ -10,8 +10,6 @@ addSbtPlugin("io.gearpump.sbt" % "sbt-pack" % "0.7.6")
addSbtPlugin("de.johoop" % "jacoco4sbt" % "2.1.6")
-addSbtPlugin("com.gilt" % "sbt-dependency-graph-sugar" % "0.7.4")
-
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
@@ -22,4 +20,4 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.1.0")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "3.0.0")
-addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
\ No newline at end of file
+addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
index ee70d6b..3a1c4fe 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
@@ -24,7 +24,6 @@ import akka.actor.ActorRef
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{`Cache-Control`, `Set-Cookie`}
-import akka.stream.io.SynchronousFileSource
import akka.stream.scaladsl.Source
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.TestProbe
@@ -44,6 +43,7 @@ import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
import scala.util.{Success, Try}
+import akka.stream.scaladsl.FileIO
class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
Matchers with BeforeAndAfterAll {
@@ -168,7 +168,9 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
}
private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
- val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000))
+ val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+ FileIO.fromFile(file, chunkSize = 100000))
+
val body = Source.single(
Multipart.FormData.BodyPart(
"file",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
index e3504ad..c276286 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
@@ -61,21 +61,21 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
assert(header[`Set-Cookie`].isDefined)
val httpCookie = header[`Set-Cookie`].get.cookie
assert(httpCookie.name == "gearpump_token")
- cookie = new HttpCookiePair(httpCookie.name, httpCookie.value)
+ cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
}
// after authentication, everything is fine.
- Get("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check {
+ Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
responseAs[String] shouldEqual "OK"
}
// however, guest cannot access high-permission operations, like POST.
- Post("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check {
+ Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
assert(rejection == AuthorizationFailedRejection)
}
// logout, should clear the session
- Post(s"/logout") ~> addHeader(Cookie(cookie)) ~> security.route ~> check{
+ Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
assert("{\"user\":\"guest\"}" == responseAs[String])
assert(status.intValue() == 200)
assert(header[`Set-Cookie`].isDefined)
@@ -106,21 +106,21 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
assert(header[`Set-Cookie`].isDefined)
val httpCookie = header[`Set-Cookie`].get.cookie
assert(httpCookie.name == "gearpump_token")
- cookie = new HttpCookiePair(httpCookie.name, httpCookie.value)
+ cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
}
// after authentication, everything is fine.
- Get("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check {
+ Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
responseAs[String] shouldEqual "OK"
}
// Not like guest, admimn can also access POST
- Post("/resource") ~> addHeader(Cookie(cookie)) ~> security.route ~> check {
+ Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
responseAs[String] shouldEqual "OK"
}
// logout, should clear the session
- Post(s"/logout") ~> addHeader(Cookie(cookie)) ~> security.route ~> check{
+ Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
assert("{\"user\":\"admin\"}" == responseAs[String])
assert(status.intValue() == 200)
assert(header[`Set-Cookie`].isDefined)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
index 3e3e0b0..df996e7 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
@@ -19,9 +19,9 @@
package io.gearpump.services.security.oauth2
import akka.actor.ActorSystem
-import akka.http.javadsl.model.HttpEntityStrict
+import akka.http.scaladsl.model.HttpEntity.Strict
import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.{Query, Path}
import akka.http.scaladsl.model._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.typesafe.config.ConfigFactory
@@ -54,7 +54,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
uaa.init(configString)
it should "generate the correct authorization request" in {
- val parameters = Uri(uaa.getAuthorizationUrl()).query.toMap
+ val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap
assert(parameters("response_type") == "code")
assert(parameters("client_id") == configMap("clientid"))
assert(parameters("redirect_uri") == configMap("callback"))
@@ -69,10 +69,10 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
def accessTokenEndpoint(request: HttpRequest) = {
assert(request.getHeader("Authorization").get.value() == "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
- assert(request.entity.contentType().mediaType.value == "application/x-www-form-urlencoded")
+ assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
- val body = request.entity.asInstanceOf[HttpEntityStrict].data().decodeString("UTF-8")
- val form = Uri./.withQuery(body).query.toMap
+ val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+ val form = Uri./.withQuery(Query(body)).query().toMap
assert(form("grant_type") == "authorization_code")
assert(form("code") == "QGGVeA")
@@ -94,7 +94,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
}
def protectedResourceEndpoint(request: HttpRequest) = {
- assert(request.getUri().parameter("access_token").get == accessToken)
+ assert(request.getUri().query().get("access_token").get == accessToken)
val response =
s"""
|{
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
index 8fbe43f..ff57baf 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
@@ -19,9 +19,9 @@
package io.gearpump.services.security.oauth2
import akka.actor.ActorSystem
-import akka.http.javadsl.model.HttpEntityStrict
+import akka.http.scaladsl.model.HttpEntity.Strict
import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.{Query, Path}
import akka.http.scaladsl.model._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.typesafe.config.ConfigFactory
@@ -55,7 +55,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
google.init(configString)
it should "generate the correct authorization request" in {
- val parameters = Uri(google.getAuthorizationUrl()).query.toMap
+ val parameters = Uri(google.getAuthorizationUrl()).query().toMap
assert(parameters("response_type") == "code")
assert(parameters("client_id") == configMap("clientid"))
assert(parameters("redirect_uri") == configMap("callback"))
@@ -70,10 +70,10 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
def accessTokenEndpoint(request: HttpRequest) = {
- assert(request.entity.contentType().mediaType.value == "application/x-www-form-urlencoded")
+ assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
- val body = request.entity.asInstanceOf[HttpEntityStrict].data().decodeString("UTF-8")
- val form = Uri./.withQuery(body).query.toMap
+ val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+ val form = Uri./.withQuery(Query(body)).query().toMap
assert(form("client_id") == configMap("clientid"))
assert(form("client_secret") == configMap("clientsecret"))
@@ -95,7 +95,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
}
def protectedResourceEndpoint(request: HttpRequest) = {
- assert(request.getUri().parameter("access_token").get == accessToken)
+ assert(request.getUri().query().get("access_token").get == accessToken)
val response =s"""
|{
| "kind": "plus#person",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
index 88e58a8..2dbdd14 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
@@ -25,7 +25,7 @@ import io.gearpump.transport.{Express, HostPort}
import io.gearpump.Message
import scala.collection.mutable
-
+import io.gearpump.util.AkkaHelper
/**
* ExpressTransport wire the networking function from default akka
* networking to customized implementation [[Express]].
@@ -43,7 +43,7 @@ trait ExpressTransport {
lazy val sourceId = TaskId.toLong(taskId)
lazy val sessionRef: ActorRef = {
- system.actorFor(s"/session#$sessionId")
+ AkkaHelper.actorFor(system, s"/session#$sessionId")
}
def transport(msg : AnyRef, remotes : TaskId *): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
index cc2f52b..2750cc5 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -38,7 +38,7 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
val dag = DAG(graph)
implicit var system: ActorSystem = null
val appId = 0
- val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph)
+ lazy val userConfig = UserConfig.empty.withValue(StreamApplication.DAG, graph)
"DagManager" should {
import io.gearpump.streaming.appmaster.ClockServiceSpec.Store
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
index 70ce643..89d2d64 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -65,6 +65,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with
it should "produce 3 messages" in {
val context: ClientContext = mock[ClientContext]
+ when(context.system).thenReturn(system)
val app = StreamApp("dsl", context)
val list = List[String](
"0",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar
----------------------------------------------------------------------
diff --git a/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar b/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar
deleted file mode 100644
index a04c476..0000000
Binary files a/unmanagedlibs/2.10/akka-actor_2.10-2.3.12-fix-1816.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/21d59216/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar
----------------------------------------------------------------------
diff --git a/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar b/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar
deleted file mode 100644
index 5cfb4a6..0000000
Binary files a/unmanagedlibs/2.11/akka-actor_2.11-2.3.12-fix-1816.jar and /dev/null differ