You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/20 08:47:57 UTC
[17/19] incubator-gearpump git commit: merge master into akka-streams
branch
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
index d4b7871..55aa73f 100644
--- a/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Meter.scala
@@ -18,7 +18,7 @@
package org.apache.gearpump.metrics
-import org.apache.gearpump.codahale.metrics.{Meter => CodaHaleMeter}
+import com.codahale.metrics.{Meter => CodaHaleMeter}
/** See org.apache.gearpump.codahale.metrics.Meter */
class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
index 1ee3798..3737361 100644
--- a/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Metrics.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import akka.actor._
import org.slf4j.Logger
-import org.apache.gearpump.codahale.metrics._
+import com.codahale.metrics._
import org.apache.gearpump.metrics
import org.apache.gearpump.util.LogUtil
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
index 94aa114..620dc61 100644
--- a/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
+++ b/core/src/main/scala/org/apache/gearpump/metrics/MetricsReporterService.scala
@@ -20,12 +20,11 @@ package org.apache.gearpump.metrics
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration._
+import scala.concurrent.duration._
import akka.actor.{Actor, ActorRef}
-
-import org.apache.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter}
-import org.apache.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter}
+import com.codahale.metrics.{MetricFilter, Slf4jReporter}
+import com.codahale.metrics.graphite.{Graphite, GraphiteReporter}
import org.apache.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics}
import org.apache.gearpump.metrics.MetricsReporterService.ReportTo
import org.apache.gearpump.util.Constants._
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
deleted file mode 100644
index 99cbcb6..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/BroadcastPartitioner.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/** Used by storm module to broadcast message to all downstream tasks */
-class BroadcastPartitioner extends MulticastPartitioner {
- private var lastPartitionNum = -1
- private var partitions = Array.empty[Int]
-
- override def getPartitions(
- msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
- if (partitionNum != lastPartitionNum) {
- partitions = (0 until partitionNum).toArray
- lastPartitionNum = partitionNum
- }
- partitions
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
deleted file mode 100644
index 5a3eec4..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/CoLocationPartitioner.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/**
- * Will have the same parallelism with last processor
- * And each task in current processor will co-locate with task of last processor
- */
-class CoLocationPartitioner extends UnicastPartitioner {
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- currentPartitionId
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
deleted file mode 100644
index ee684a9..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/HashPartitioner.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.apache.gearpump.Message
-
-/**
- * Only make sense when the message has implemented the hashCode()
- * Otherwise, it will use Object.hashCode(), which will not return
- * same hash code after serialization and deserialization.
- */
-class HashPartitioner extends UnicastPartitioner {
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
deleted file mode 100644
index d68fa65..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/Partitioner.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import scala.reflect.ClassTag
-
-import org.apache.commons.lang.SerializationUtils
-
-import org.apache.gearpump.Message
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
- * of upstream processor A send to several tasks of downstream processor B.
- */
-sealed trait Partitioner extends Serializable
-
-/**
- * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
- * ONE-task {@literal ->} ONE-task mapping.
- */
-trait UnicastPartitioner extends Partitioner {
-
- /**
- * Gets the SINGLE downstream processor task index to send message to.
- *
- * @param msg Message you want to send
- * @param partitionNum How many tasks does the downstream processor have.
- * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
- *
- * @return ONE task index of downstream processor.
- */
- def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
-
- def getPartition(msg: Message, partitionNum: Int): Int = {
- getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
- }
-}
-
-trait MulticastPartitioner extends Partitioner {
-
- /**
- * Gets a list of downstream processor task indexes to send message to.
- *
- * @param upstreamTaskIndex Current sender task's task index.
- *
- */
- def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
-
- def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
- getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
- }
-}
-
-sealed trait PartitionerFactory {
-
- def name: String
-
- def partitioner: Partitioner
-}
-
-/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
-class PartitionerObject(private[this] val _partitioner: Partitioner)
- extends PartitionerFactory with Serializable {
-
- override def name: String = partitioner.getClass.getName
-
- override def partitioner: Partitioner = {
- SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
- }
-}
-
-/** Store the partitioner in class Name, the user need to instantiate a new class */
-class PartitionerByClassName(partitionerClass: String)
- extends PartitionerFactory with Serializable {
-
- override def name: String = partitionerClass
- override def partitioner: Partitioner = {
- Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
- }
-}
-
-/**
- * @param partitionerFactory How we construct a Partitioner.
- */
-case class PartitionerDescription(partitionerFactory: PartitionerFactory)
-
-object Partitioner {
- val UNKNOWN_PARTITION_ID = -1
-
- def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
- PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
deleted file mode 100644
index 55ef614..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/ShuffleGroupingPartitioner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import scala.util.Random
-
-import org.apache.gearpump.Message
-
-/**
- * The idea of ShuffleGroupingPartitioner is derived from Storm.
- * Messages are randomly distributed across the downstream's tasks in a way such that
- * each task is guaranteed to get an equal number of messages.
- */
-class ShuffleGroupingPartitioner extends UnicastPartitioner {
- private val random = new Random
- private var index = -1
- private var partitions = List.empty[Int]
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
- index += 1
- if (partitions.isEmpty) {
- partitions = 0.until(partitionNum).toList
- partitions = random.shuffle(partitions)
- } else if (index >= partitionNum) {
- index = 0
- partitions = random.shuffle(partitions)
- }
- partitions(index)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala b/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
deleted file mode 100644
index 5c66d66..0000000
--- a/core/src/main/scala/org/apache/gearpump/partitioner/ShufflePartitioner.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import java.util.Random
-
-import org.apache.gearpump.Message
-
-/**
- * Round Robin partition the data to downstream processor tasks.
- */
-class ShufflePartitioner extends UnicastPartitioner {
- private var seed = 0
- private var count = 0
-
- override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-
- if (seed == 0) {
- seed = newSeed()
- }
-
- val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
- count = count + 1
- result
- }
-
- private def newSeed(): Int = new Random().nextInt()
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
index f258c0f..cb3d2fd 100644
--- a/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
+++ b/core/src/main/scala/org/apache/gearpump/serializer/FastKryoSerializer.scala
@@ -19,19 +19,18 @@
package org.apache.gearpump.serializer
import akka.actor.ExtendedActorSystem
-
-import org.apache.gearpump.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
-import org.apache.gearpump.objenesis.strategy.StdInstantiatorStrategy
-import org.apache.gearpump.romix.serialization.kryo.KryoSerializerWrapper
+import com.esotericsoftware.kryo.Kryo.DefaultInstantiatorStrategy
+import com.romix.akka.serialization.kryo.{KryoBasedSerializer, KryoSerializer}
import org.apache.gearpump.serializer.FastKryoSerializer.KryoSerializationException
import org.apache.gearpump.util.LogUtil
+import org.objenesis.strategy.StdInstantiatorStrategy
class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
private val LOG = LogUtil.getLogger(getClass)
private val config = system.settings.config
- private val kryoSerializer = new KryoSerializerWrapper(system)
+ private val kryoSerializer: KryoBasedSerializer = new KryoSerializer(system).serializer
private val kryo = kryoSerializer.kryo
val strategy = new DefaultInstantiatorStrategy
strategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy)
@@ -40,7 +39,7 @@ class FastKryoSerializer(system: ExtendedActorSystem) extends Serializer {
override def serialize(message: Any): Array[Byte] = {
try {
- kryoSerializer.toBinary(message)
+ kryoSerializer.toBinary(message.asInstanceOf[AnyRef])
} catch {
case ex: java.lang.IllegalArgumentException =>
val clazz = message.getClass
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
index 524089d..45a5481 100644
--- a/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
+++ b/core/src/main/scala/org/apache/gearpump/serializer/GearpumpSerialization.scala
@@ -18,10 +18,9 @@
package org.apache.gearpump.serializer
+import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
import com.typesafe.config.Config
import org.slf4j.Logger
-
-import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
import org.apache.gearpump.util.{Constants, LogUtil}
class GearpumpSerialization(config: Config) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index 09f2969..82c7fe2 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -21,13 +21,12 @@ package org.apache.gearpump.util
import org.apache.gearpump.cluster.AppMasterContext
import org.apache.gearpump.cluster.worker.WorkerId
-import scala.concurrent.{ExecutionContext, Future}
-
+import scala.concurrent.{Await, ExecutionContext, Future}
import akka.actor.Actor.Receive
import akka.actor._
import akka.pattern.ask
import org.slf4j.Logger
-
+import akka.util.Timeout
import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers}
import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
@@ -36,6 +35,8 @@ import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSy
import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
import org.apache.gearpump.transport.HostPort
+import scala.concurrent.duration.Duration
+
object ActorUtil {
private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -136,4 +137,13 @@ object ActorUtil {
implicit val timeout = Constants.FUTURE_TIMEOUT
(actor ? msg).asInstanceOf[Future[T]]
}
+
+ def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = {
+ askActor(actor, msg, timeout, ActorRef.noSender)
+ }
+
+ def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef)
+ (implicit ex: ExecutionContext): T = {
+ Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
index dba5a1f..c98726e 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Constants.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
@@ -20,8 +20,6 @@ package org.apache.gearpump.util
import java.util.concurrent.TimeUnit
-import org.apache.gearpump.partitioner._
-
object Constants {
val MASTER_WATCHER = "masterwatcher"
val SINGLETON_MANAGER = "singleton"
@@ -140,14 +138,6 @@ object Constants {
val GEARPUMP_SERVICE_SUPERVISOR_PATH = "gearpump.services.supervisor-actor-path"
val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = "gearpump.services.config-render-option-concise"
- // The partitioners provided by Gearpump
- val BUILTIN_PARTITIONERS = Array(
- classOf[BroadcastPartitioner],
- classOf[CoLocationPartitioner],
- classOf[HashPartitioner],
- classOf[ShuffleGroupingPartitioner],
- classOf[ShufflePartitioner])
-
// Security related
val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
index e3df37b..283a64a 100644
--- a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.util
import java.io.{File, IOException}
import java.nio.charset.Charset
-import org.apache.gearpump.google.common.io.Files
+import com.google.common.io.Files
object FileUtils {
private val UTF8 = Charset.forName("UTF-8")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala b/core/src/main/scala/org/apache/gearpump/util/Util.scala
index 0faa46a..8ee0e26 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Util.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala
@@ -66,8 +66,14 @@ object Util {
def startProcess(options: Array[String], classPath: Array[String], mainClass: String,
arguments: Array[String]): RichProcess = {
val java = System.getProperty("java.home") + "/bin/java"
- val command = List(java) ++ options ++
- List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments
+
+ val command = List(java) ++
+ // java.lang.VerifyError will be caused without "-noverify"
+ // TODO: investigate the cause and remove this
+ Array("-noverify") ++
+ options ++
+ List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++
+ arguments
LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} " +
s"\n ${options.mkString(" ")}")
val logger = new ProcessLogRedirector()
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
new file mode 100644
index 0000000..c64d444
--- /dev/null
+++ b/core/src/test/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStore
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..0a22245
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class MiniCluster {
+ private val mockMasterIP = "127.0.0.1"
+
+ implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+ withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+ val (mockMaster, worker) = {
+ val master = system.actorOf(Props(classOf[Master]), "master")
+ val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+ // Wait until worker register itself to master
+ waitUtilWorkerIsRegistered(master)
+ (master, worker)
+ }
+
+ def launchActor(props: Props): TestActorRef[Actor] = {
+ TestActorRef(props)
+ }
+
+ private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+ while (!isWorkerRegistered(master)) {}
+ }
+
+ private def isWorkerRegistered(master: ActorRef): Boolean = {
+ import scala.concurrent.duration._
+ implicit val dispatcher = system.dispatcher
+
+ implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+ val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+ // Waits until the worker is registered.
+ val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+ workers.workers.size > 0
+ }
+
+ def shutDown(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
new file mode 100644
index 0000000..f9b0762
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.util.Success
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+ var kvService: TestProbe = null
+ var haService: TestProbe = null
+ var appLauncher: TestProbe = null
+ var appManager: ActorRef = null
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ kvService = TestProbe()(getActorSystem)
+ appLauncher = TestProbe()(getActorSystem)
+
+ appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+ new DummyAppMasterLauncherFactory(appLauncher))))
+ kvService.expectMsgType[GetKV]
+ kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "AppManager" should "handle AppMaster message correctly" in {
+ val appMaster = TestProbe()(getActorSystem)
+ val appId = 1
+
+ val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
+ appMaster.send(appManager, register)
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ appMaster.send(appManager, ActivateAppMaster(appId))
+ appMaster.expectMsgType[AppMasterActivated]
+ }
+
+ "DataStoreService" should "support Put and Get" in {
+ val appMaster = TestProbe()(getActorSystem)
+ appMaster.send(appManager, SaveAppData(0, "key", 1))
+ kvService.expectMsgType[PutKV]
+ kvService.reply(PutKVSuccess)
+ appMaster.expectMsg(AppDataSaved)
+
+ appMaster.send(appManager, GetAppData(0, "key"))
+ kvService.expectMsgType[GetKV]
+ kvService.reply(GetKVSuccess("key", 1))
+ appMaster.expectMsg(GetAppDataResult("key", 1))
+ }
+
+ "AppManager" should "support application submission and shutdown" in {
+ testClientSubmission(withRecover = false)
+ }
+
+ "AppManager" should "support application submission and recover if appmaster dies" in {
+ LOG.info("=================testing recover==============")
+ testClientSubmission(withRecover = true)
+ }
+
+ "AppManager" should "handle client message correctly" in {
+ val mockClient = TestProbe()(getActorSystem)
+ mockClient.send(appManager, ShutdownApplication(1))
+ assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+ mockClient.send(appManager, ResolveAppId(1))
+ assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+ mockClient.send(appManager, AppMasterDataRequest(1))
+ mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+ }
+
+ "AppManager" should "reject the application submission if the app name already existed" in {
+ val app = TestUtil.dummyApp
+ val submit = SubmitApplication(app, None, "username")
+ val client = TestProbe()(getActorSystem)
+ val appMaster = TestProbe()(getActorSystem)
+ val worker = TestProbe()(getActorSystem)
+ val appId = 1
+
+ client.send(appManager, submit)
+
+ kvService.expectMsgType[PutKV]
+ appLauncher.expectMsg(LauncherStarted(appId))
+ appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+ AppMasterRuntimeInfo(appId, app.name)))
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ client.send(appManager, submit)
+ assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+ }
+
+ def testClientSubmission(withRecover: Boolean): Unit = {
+ val app = TestUtil.dummyApp
+ val submit = SubmitApplication(app, None, "username")
+ val client = TestProbe()(getActorSystem)
+ val appMaster = TestProbe()(getActorSystem)
+ val worker = TestProbe()(getActorSystem)
+ val appId = 1
+
+ client.send(appManager, submit)
+
+ kvService.expectMsgType[PutKV]
+ appLauncher.expectMsg(LauncherStarted(appId))
+ appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+ AppMasterRuntimeInfo(appId, app.name)))
+ kvService.expectMsgType[PutKV]
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ client.send(appManager, ResolveAppId(appId))
+ client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+ client.send(appManager, AppMastersDataRequest)
+ client.expectMsgType[AppMastersData]
+
+ client.send(appManager, AppMasterDataRequest(appId, false))
+ client.expectMsgType[AppMasterData]
+
+ if (!withRecover) {
+ client.send(appManager, ShutdownApplication(appId))
+ client.expectMsg(ShutdownApplicationResult(Success(appId)))
+ } else {
+ // Do recovery
+ getActorSystem.stop(appMaster.ref)
+ kvService.expectMsgType[GetKV]
+ val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
+ kvService.reply(GetKVSuccess(APP_STATE, appState))
+ appLauncher.expectMsg(LauncherStarted(appId))
+ }
+ }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
+ override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+ Props(new DummyAppMasterLauncher(test, appId))
+ }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+ test.ref ! LauncherStarted(appId)
+
+ override def receive: Receive = {
+ case any: Any => test.ref forward any
+ }
+}
+
+case class LauncherStarted(appId: Int)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..d3e739f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.InMemoryKVService
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class InMemoryKVServiceSpec
+ extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ override def config: Config = TestUtil.MASTER_CONFIG
+
+ "KVService" should "get, put, delete correctly" in {
+ val system = getActorSystem
+ val kvService = system.actorOf(Props(new InMemoryKVService()))
+ val group = "group"
+
+ val client = TestProbe()(system)
+
+ client.send(kvService, PutKV(group, "key", 1))
+ client.expectMsg(PutKVSuccess)
+
+ client.send(kvService, PutKV(group, "key", 2))
+ client.expectMsg(PutKVSuccess)
+
+ client.send(kvService, GetKV(group, "key"))
+ client.expectMsg(GetKVSuccess("key", 2))
+
+ client.send(kvService, DeleteKVGroup(group))
+
+ // After DeleteGroup, it no longer accept Get and Put message for this group.
+ client.send(kvService, GetKV(group, "key"))
+ client.expectNoMsg(3.seconds)
+
+ client.send(kvService, PutKV(group, "key", 3))
+ client.expectNoMsg(3.seconds)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
new file mode 100644
index 0000000..5f0d5e4
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import akka.util.Timeout
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.RunningApplicationSpec.{MockAskAppMasterRequest, MockAskAppMasterResponse}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+import scala.concurrent.ExecutionContext.Implicits.global
+
+class RunningApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+ implicit var system: ActorSystem = _
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ "RunningApplication" should "be able to shutdown application" in {
+ val errorMsg = "mock exception"
+ val master = TestProbe()
+ val timeout = Timeout(90, TimeUnit.SECONDS)
+ val application = new RunningApplication(1, master.ref, timeout)
+ Future {
+ application.shutDown()
+ }
+ master.expectMsg(ShutdownApplication(1))
+ master.reply(ShutdownApplicationResult(Success(1)))
+
+ val result = Future {
+ intercept[Exception] {
+ application.shutDown()
+ }
+ }
+ master.expectMsg(ShutdownApplication(1))
+ master.reply(ShutdownApplicationResult(Failure(new Exception(errorMsg))))
+ val exception = Await.result(result, Duration.Inf)
+ assert(exception.getMessage.equals(errorMsg))
+ }
+
+ "RunningApplication" should "be able to ask appmaster" in {
+ val master = TestProbe()
+ val appMaster = TestProbe()
+ val appId = 1
+ val timeout = Timeout(90, TimeUnit.SECONDS)
+ val request = MockAskAppMasterRequest("request")
+ val application = new RunningApplication(appId, master.ref, timeout)
+ val future = application.askAppMaster[MockAskAppMasterResponse](request)
+ master.expectMsg(ResolveAppId(appId))
+ master.reply(ResolveAppIdResult(Success(appMaster.ref)))
+ appMaster.expectMsg(MockAskAppMasterRequest("request"))
+ appMaster.reply(MockAskAppMasterResponse("response"))
+ val result = Await.result(future, Duration.Inf)
+ assert(result.res.equals("response"))
+
+ // ResolveAppId should not be called multiple times
+ val future2 = application.askAppMaster[MockAskAppMasterResponse](request)
+ appMaster.expectMsg(MockAskAppMasterRequest("request"))
+ appMaster.reply(MockAskAppMasterResponse("response"))
+ val result2 = Await.result(future2, Duration.Inf)
+ assert(result2.res.equals("response"))
+ }
+}
+
+object RunningApplicationSpec {
+ case class MockAskAppMasterRequest(req: String)
+
+ case class MockAskAppMasterResponse(res: String)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..2166976
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.Properties
+
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+import org.scalatest._
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "Worker" should "register worker address to master when started." in {
+
+ val masterReceiver = createMockMaster()
+
+ val tempTestConf = convertTestConf(getHost, getPort)
+
+ val options = Array(
+ s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+ s"-D${PREFER_IPV4}=true"
+ ) ++ getMasterListOption()
+
+ val worker = Util.startProcess(options,
+ getContextClassPath,
+ getMainClassName(Worker),
+ Array.empty)
+
+ try {
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+ tempTestConf.delete()
+ } finally {
+ worker.destroy()
+ }
+ }
+
+ "Master" should "accept worker RegisterNewWorker when started" in {
+ val worker = TestProbe()(getActorSystem)
+
+ val host = "127.0.0.1"
+ val port = Util.findFreePort().get
+
+ val properties = new Properties()
+ properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
+ properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
+ val masterConfig = ConfigFactory.parseProperties(properties)
+ .withFallback(TestUtil.MASTER_CONFIG)
+ Future {
+ Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
+ }
+
+ val masterProxy = getActorSystem.actorOf(
+ MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+
+ worker.send(masterProxy, RegisterNewWorker)
+ worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+ }
+
+ "Info" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+ masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
+ }
+
+ "Kill" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ Kill.main(masterConfig, Array("-appid", "0"))
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+ masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+ }
+
+ "Replay" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ Replay.main(masterConfig, Array("-appid", "0"))
+ }
+
+ masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+ masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+ masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+ masterReceiver.reply(ReplayApplicationResult(Success(0)))
+ }
+
+ "Local" should "be started without exception" in {
+ val port = Util.findFreePort().get
+ val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+ s"-D${PREFER_IPV4}=true")
+
+ val local = Util.startProcess(options,
+ getContextClassPath,
+ getMainClassName(Local),
+ Array.empty)
+
+ def retry(times: Int)(fn: => Boolean): Boolean = {
+
+ LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
+
+ val result = fn
+ if (result || times <= 0) {
+ result
+ } else {
+ Thread.sleep(1000)
+ retry(times - 1)(fn)
+ }
+ }
+
+ try {
+ assert(retry(10)(isPortUsed("127.0.0.1", port)),
+ "local is not started successfully, as port is not used " + port)
+ } finally {
+ local.destroy()
+ }
+ }
+
+ "Gear" should "support app|info|kill|shell|replay" in {
+
+ val commands = Array("app", "info", "kill", "shell", "replay")
+
+ assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+ for (command <- commands) {
+ assert(Try(Gear.main(Array("-noexist"))).isFailure,
+ "pass unknown option, throw, command: " + command)
+ }
+
+ assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
+
+ val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+ assert(tryThis.isFailure, "unknown command, throw")
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..b48fc2a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.TestUtil
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+ def config: Config = TestUtil.MASTER_CONFIG
+
+ "MasterWatcher" should "kill itself when can not get a quorum" in {
+ val system = ActorSystem("ForMasterWatcher", config)
+
+ val actorWatcher = TestProbe()(system)
+
+ val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
+ actorWatcher watch masterWatcher
+ actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..8a3d7d1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+ with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+ def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
+ val appId = 0
+ val workerId1: WorkerId = WorkerId(1, 0L)
+ val workerId2: WorkerId = WorkerId(2, 0L)
+ val mockAppMaster = TestProbe()
+ val mockWorker1 = TestProbe()
+ val mockWorker2 = TestProbe()
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "The scheduler" should {
+ "update resource only when the worker is registered" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+ expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+ s"registered into master"))
+ }
+
+ "drop application's resource requests when the application is removed" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ mockAppMaster.expectNoMsg(5.seconds)
+ }
+ }
+
+ def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+ left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+ }
+
+ "The resource request with higher priority" should {
+ "be handled first" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+ val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
+
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The resource request which delivered earlier" should {
+ "be handled first if the priorities are the same" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different relaxation" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+ val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
+
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ val request3 = ResourceRequest(
+ Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+ expect = ResourceAllocated(Array(
+ ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+ ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ // We have to manually update the resource on each worker
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different executor number" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ // By default, the request requires only one executor
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations2.allocations.length == 1)
+ assert(allocations2.allocations.head.resource == Resource(20))
+
+ val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations3.allocations.length == 3)
+ assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+ // The total available resource can not satisfy the requirements with executor number
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+ val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations4.allocations.length == 2)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+ // When new resources are available, the remaining request will be satisfied
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+ val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations5.allocations.length == 1)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..e0233f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ val appId = 1
+ val workerId: WorkerId = WorkerId(1, 0L)
+ val executorId = 1
+ var masterProxy: TestProbe = null
+ var mockMaster: TestProbe = null
+ var client: TestProbe = null
+ val workerSlots = 50
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ mockMaster = TestProbe()(getActorSystem)
+ masterProxy = TestProbe()(getActorSystem)
+ client = TestProbe()(getActorSystem)
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "The new started worker" should {
+ "kill itself if no response from Master after registering" in {
+ val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+ mockMaster watch worker
+ mockMaster.expectMsg(RegisterNewWorker)
+ mockMaster.expectTerminated(worker, 60.seconds)
+ }
+ }
+
+ "Worker" should {
+ "init its resource from the gearpump config" in {
+ val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+ withFallback(TestUtil.DEFAULT_CONFIG)
+ val workerSystem = ActorSystem("WorkerSystem", config)
+ val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+ mockMaster watch worker
+ mockMaster.expectMsg(RegisterNewWorker)
+
+ worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
+
+ worker.tell(
+ UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+ mockMaster.expectTerminated(worker, 5.seconds)
+ workerSystem.terminate()
+ Await.result(workerSystem.whenTerminated, Duration.Inf)
+ }
+ }
+
+ "Worker" should {
+ "update its remaining resource when launching and shutting down executors" in {
+ val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
+ masterProxy.expectMsg(RegisterNewWorker)
+
+ worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+ val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ // This is an actor path which the ActorSystemBooter will report back to,
+ // not needed in this test
+ val reportBack = "dummy"
+ val executionContext = ExecutorJVMConfig(Array.empty[String],
+ getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+ classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+ username = "user")
+
+ // Test LaunchExecutor
+ worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+ mockMaster.ref)
+ mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
+
+ worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+ worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+ // Test terminationWatch
+ worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+ client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+ worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
+ client.expectMsg(ShutdownExecutorFailed(
+ s"Can not find executor ${executorId + 1} for app $appId"))
+
+ mockMaster.ref ! PoisonPill
+ masterProxy.expectMsg(RegisterWorker(workerId))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
index c99a031..39b6261 100644
--- a/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/jarstore/FileServerSpec.scala
@@ -22,9 +22,9 @@ import java.io.File
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
-import com.typesafe.config.{ConfigValueFactory, ConfigValue}
+import com.google.common.io.Files
+import com.typesafe.config.ConfigValueFactory
import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.google.common.io.Files
import org.apache.gearpump.jarstore.local.LocalJarStore
import org.apache.gearpump.util.{FileUtils, LogUtil}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -39,7 +39,7 @@ class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
val host = "localhost"
private val LOG = LogUtil.getLogger(getClass)
- var system: ActorSystem = null
+ var system: ActorSystem = _
override def afterAll {
if (null != system) {
@@ -75,7 +75,6 @@ class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
"The file server" should {
"serve the data previously stored" in {
-
val rootDir = Files.createTempDir()
val localJarStore: JarStore = new LocalJarStore
val conf = TestUtil.DEFAULT_CONFIG.withValue("gearpump.jarstore.rootpath",
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
index 5881640..0855553 100644
--- a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
@@ -18,12 +18,13 @@
package org.apache.gearpump.metrics
+import com.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
+
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
deleted file mode 100644
index fcf819b..0000000
--- a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.partitioner
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.Message
-
-class PartitionerSpec extends FlatSpec with Matchers {
- val NUM = 10
-
- "HashPartitioner" should "hash same key to same slots" in {
- val partitioner = new HashPartitioner
-
- val data = new Array[Byte](1000)
- (new java.util.Random()).nextBytes(data)
- val msg = Message(data)
-
- val partition = partitioner.getPartition(msg, NUM)
- assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
- assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" +
- "consistent result")
- }
-
- "ShufflePartitioner" should "hash same key randomly" in {
- val partitioner = new ShufflePartitioner
-
- val data = new Array[Byte](1000)
- (new java.util.Random()).nextBytes(data)
- val msg = Message(data)
-
- val partition = partitioner.getPartition(msg, NUM)
- assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
- assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
- "consistent result")
- }
-
- "BroadcastPartitioner" should "return all partitions" in {
- val partitioner = new BroadcastPartitioner
-
- val data = new Array[Byte](1000)
- (new java.util.Random()).nextBytes(data)
- val msg = Message(data)
- val partitions = partitioner.getPartitions(msg, NUM)
-
- partitions should contain theSameElementsAs 0.until(NUM)
- }
-
-
- "ShuffleGroupingPartitioner" should "hash same key randomly" in {
- val partitioner = new ShuffleGroupingPartitioner
-
- val data = new Array[Byte](1000)
- (new java.util.Random()).nextBytes(data)
- val msg = Message(data)
-
- val partition = partitioner.getPartition(msg, NUM)
- assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
- assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
- "consistent result")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
index f4bd114..0772a5e 100644
--- a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
@@ -18,20 +18,23 @@
package org.apache.gearpump.serializer
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
import akka.actor.{ActorSystem, ExtendedActorSystem}
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.esotericsoftware.kryo.io.{Input, Output}
-import org.apache.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
import org.apache.gearpump.serializer.SerializerSpec._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+
class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
val config = ConfigFactory.empty.withValue("gearpump.serializers",
ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName,
@@ -70,7 +73,7 @@ object SerializerSpec {
class ClassASerializer extends KryoSerializer[ClassA] {
override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {
- output.writeString(classOf[ClassA].getName.toString)
+ output.writeString(classOf[ClassA].getName)
}
override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
index 66abc36..97b35ad 100644
--- a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
@@ -18,13 +18,13 @@
package org.apache.gearpump.util
+import com.google.common.io.Files
+
import java.io.File
import java.util
import org.scalatest.FlatSpec
-import org.apache.gearpump.google.common.io.Files
-
class FileUtilsSpec extends FlatSpec {
val TXT =
"""
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
deleted file mode 100644
index e173a8a..0000000
--- a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStore
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.gearpump.jarstore.local.LocalJarStore
-org.apache.gearpump.jarstore.dfs.DFSJarStore
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 9e55be6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
- /** When an worker is started, it sends RegisterNewWorker */
- case object RegisterNewWorker
-
- /** When worker lose connection with master, it tries to register itself again with old Id. */
- case class RegisterWorker(workerId: WorkerId)
-
- /** Worker is responsible to broadcast its current status to master */
- case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
- /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
- case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
- /** Worker have not received reply from master */
- case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
- /** Master is synced with worker on resource slots managed by current worker */
- case object UpdateResourceSucceed
-}
\ No newline at end of file