You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/27 11:38:49 UTC
[2/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the
integration test file structure
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
deleted file mode 100644
index 73413da..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.sys.process._
-
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A helper to instantiate the base image for different usage.
- */
-class BaseContainer(val host: String, command: String,
- masterAddrs: List[(String, Int)],
- tunnelPorts: Set[Int] = Set.empty) {
-
- private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
- private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
- private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
- private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
- private val HOST_LOG_HOME = {
- val dir = "/tmp/gearpump"
- s"mkdir -p $dir".!!
- s"mktemp -p $dir -d".!!.trim
- }
-
- private val CLUSTER_OPTS = {
- masterAddrs.zipWithIndex.map { case (hostPort, index) =>
- s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
- }.mkString(" ")
- }
-
- def createAndStart(): String = {
- Docker.createAndStartContainer(host, IMAGE_NAME, command,
- environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
- volumes = Map(
- HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
- HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
- knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
- tunnelPorts = tunnelPorts)
- }
-
- def killAndRemove(): Unit = {
- Docker.killAndRemoveContainer(host)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
deleted file mode 100644
index 884a8d1..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A command-line client to operate a Gearpump cluster
- */
-class CommandLineClient(host: String) {
-
- private val LOG = Logger.getLogger(getClass)
-
- def listApps(): Array[String] = {
- gearCommand(host, "gear info").split("\n").filter(
- _.startsWith("application: ")
- )
- }
-
- def listRunningApps(): Array[String] =
- listApps().filter(
- _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
- )
-
- def queryApp(appId: Int): String = try {
- listApps().filter(
- _.startsWith(s"application: $appId")
- ).head
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- ""
- }
-
- def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
- gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
- }
-
- def submitApp(jar: String, args: String = ""): Int = {
- LOG.debug(s"|=> Submit Application $jar...")
- submitAppUse("gear app", jar, args)
- }
-
- private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
- gearCommand(host, s"$launcher -jar $jar $args").split("\n")
- .filter(_.contains("The application id is ")).head.split(" ").last.toInt
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- -1
- }
-
- def killApp(appId: Int): Boolean = {
- tryGearCommand(host, s"gear kill -appid $appId")
- }
-
- private def gearCommand(container: String, command: String): String = {
- LOG.debug(s"|=> Gear command $command in container $container...")
- Docker.execute(container, s"/opt/start $command")
- }
-
- private def tryGearCommand(container: String, command: String): Boolean = {
- LOG.debug(s"|=> Gear command $command in container $container...")
- Docker.executeSilently(container, s"/opt/start $command")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
deleted file mode 100644
index 4d439e8..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import java.io.IOException
-import scala.collection.mutable.ListBuffer
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-/**
- * This class is a test driver for end-to-end integration test.
- */
-class MiniCluster {
-
- private val LOG = Logger.getLogger(getClass)
- private val SUT_HOME = "/opt/gearpump"
-
- private val REST_SERVICE_PORT = 8090
- private val MASTER_PORT = 3000
- private val MASTER_ADDRS: List[(String, Int)] = {
- (0 to 0).map(index =>
- ("master" + index, MASTER_PORT)
- ).toList
- }
-
- lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
-
- lazy val restClient: RestClient = {
- val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
- client
- }
-
- private var workers: ListBuffer[String] = ListBuffer.empty
-
- def start(workerNum: Int = 2): Unit = {
-
- // Kill master
- MASTER_ADDRS.foreach { case (host, _) =>
- if (Docker.containerExists(host)) {
- Docker.killAndRemoveContainer(host)
- }
- }
-
- // Kill existing workers
- workers ++= (0 until workerNum).map("worker" + _)
- workers.foreach { worker =>
- if (Docker.containerExists(worker)) {
- Docker.killAndRemoveContainer(worker)
- }
- }
-
- // Start Masters
- MASTER_ADDRS.foreach({ case (host, port) =>
- addMasterNode(host, port)
- })
-
- // Start Workers
- workers.foreach { worker =>
- val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
- container.createAndStart()
- }
-
- // Check cluster status
- expectRestClientAuthenticated()
- expectClusterAvailable()
- }
-
- private def addMasterNode(host: String, port: Int): Unit = {
- val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
- container.createAndStart()
- }
-
- def addWorkerNode(host: String): Unit = {
- if (workers.find(_ == host).isEmpty) {
- val container = new BaseContainer(host, "worker", MASTER_ADDRS)
- container.createAndStart()
- workers += host
- } else {
- throw new IOException(s"Cannot add new worker $host, " +
- s"as worker with same hostname already exists")
- }
- }
-
- /**
- * @throws RuntimeException if rest client is not authenticated after N attempts
- */
- private def expectRestClientAuthenticated(): Unit = {
- Util.retryUntil(() => {
- restClient.login()
- LOG.info("rest client has been authenticated")
- true
- }, "login successfully")
- }
-
- /**
- * @throws RuntimeException if service is not available after N attempts
- */
- private def expectClusterAvailable(): Unit = {
- Util.retryUntil(() => {
- val response = restClient.queryMaster()
- LOG.info(s"cluster is now available with response: $response.")
- response.aliveFor > 0
- }, "cluster running")
- }
-
- def isAlive: Boolean = {
- getMasterHosts.exists(nodeIsOnline)
- }
-
- def getNetworkGateway: String = {
- Docker.getNetworkGateway(MASTER_ADDRS.head._1)
- }
-
- def shutDown(): Unit = {
- val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
- .filter(nodeIsOnline).toArray
- if (removalHosts.length > 0) {
- Docker.killAndRemoveContainer(removalHosts)
- }
- workers.clear()
- }
-
- def removeMasterNode(host: String): Unit = {
- Docker.killAndRemoveContainer(host)
- }
-
- def removeWorkerNode(host: String): Unit = {
- workers -= host
- Docker.killAndRemoveContainer(host)
- }
-
- def restart(): Unit = {
- shutDown()
- Util.retryUntil(() => {
- !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
- }, "all docker containers killed")
- LOG.info("all docker containers have been killed. restarting...")
- start()
- }
-
- def getMastersAddresses: List[(String, Int)] = {
- MASTER_ADDRS
- }
-
- def getMasterHosts: List[String] = {
- MASTER_ADDRS.map({ case (host, port) => host })
- }
-
- def getWorkerHosts: List[String] = {
- workers.toList
- }
-
- def nodeIsOnline(host: String): Boolean = {
- Docker.containerIsRunning(host)
- }
-
- private def builtInJarsUnder(folder: String): Array[String] = {
- Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
- }
-
- private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
- builtInJarsUnder(folder).filter(_.contains(subtext))
- }
-
- def queryBuiltInExampleJars(subtext: String): Seq[String] = {
- queryBuiltInJars("examples", subtext)
- }
-
- def queryBuiltInITJars(subtext: String): Seq[String] = {
- queryBuiltInJars("integrationtest", subtext)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
deleted file mode 100644
index 1b143af..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.reflect.ClassTag
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.log4j.Logger
-import upickle.Js
-import upickle.default._
-
-import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
-import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
-import org.apache.gearpump.cluster.master.MasterSummary
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-import org.apache.gearpump.services.AppMasterService.Status
-import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
-// NOTE: This cannot be removed!!!
-import org.apache.gearpump.services.util.UpickleUtil._
-import org.apache.gearpump.streaming.ProcessorDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
-import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
-import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
-import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
-import org.apache.gearpump.util.{Constants, Graph}
-
-/**
- * A REST client to operate a Gearpump cluster
- */
-class RestClient(host: String, port: Int) {
-
- private val LOG = Logger.getLogger(getClass)
-
- private val cookieFile: String = "cookie.txt"
-
- implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
- upickle.default.Reader[Graph[Int, String]] {
- case Js.Obj(verties, edges) =>
- val vertexList = upickle.default.readJs[List[Int]](verties._2)
- val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
- Graph(vertexList, edgeList)
- }
-
- private def decodeAs[T](
- expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
- read[T](expr)
- } catch {
- case ex: Throwable =>
- LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
- throw ex
- }
-
- def queryVersion(): String = {
- curl("version")
- }
-
- def listWorkers(): Array[WorkerSummary] = {
- val resp = callApi("master/workerlist")
- decodeAs[List[WorkerSummary]](resp).toArray
- }
-
- def listRunningWorkers(): Array[WorkerSummary] = {
- listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
- }
-
- def listApps(): Array[AppMasterData] = {
- val resp = callApi("master/applist")
- decodeAs[AppMastersData](resp).appMasters.toArray
- }
-
- def listRunningApps(): Array[AppMasterData] = {
- listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
- }
-
- def getNextAvailableAppId(): Int = {
- listApps().length + 1
- }
-
- def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
- : Boolean = try {
- var endpoint = "master/submitapp"
-
- var options = Seq(s"jar=@$jar")
- if (config.length > 0) {
- options :+= s"conf=@$config"
- }
-
- options :+= s"executorcount=$executorNum"
-
- if (args != null && !args.isEmpty) {
- options :+= "args=\"" + args + "\""
- }
-
- val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
- val result = decodeAs[AppSubmissionResult](resp)
- assert(result.success)
- true
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def queryApp(appId: Int): AppMasterData = {
- val resp = callApi(s"appmaster/$appId")
- decodeAs[AppMasterData](resp)
- }
-
- def queryAppMasterConfig(appId: Int): Config = {
- val resp = callApi(s"appmaster/$appId/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
- val resp = callApi(s"appmaster/$appId?detail=true")
- decodeAs[StreamAppMasterSummary](resp)
- }
-
- def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
- : HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
- val resp = callApi(s"appmaster/$appId/executor/$executorId")
- decodeAs[ExecutorSummary](resp)
- }
-
- def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
- queryStreamingAppDetail(appId).executors.toArray
- }
-
- def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryExecutorConfig(appId: Int, executorId: Int): Config = {
- val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryMaster(): MasterSummary = {
- val resp = callApi("master")
- decodeAs[MasterData](resp).masterDescription
- }
-
- def queryMasterMetrics(current: Boolean): HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"master/metrics/master?$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryMasterConfig(): Config = {
- val resp = callApi("master/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val workerIdStr = WorkerId.render(workerId)
- val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryWorkerConfig(workerId: WorkerId): Config = {
- val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryBuiltInPartitioners(): Array[String] = {
- val resp = callApi("master/partitioners")
- decodeAs[BuiltinPartitioners](resp).partitioners
- }
-
- def uploadJar(localFilePath: String): AppJar = {
- val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
- decodeAs[AppJar](resp)
- }
-
- def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
- replaceStreamingAppProcessor(appId, replaceMe, false)
- }
-
- def replaceStreamingAppProcessor(
- appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
- val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
- val args = upickle.default.write(replaceOperation)
- val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
- CRUD_POST)
- decodeAs[DAGOperationResult](resp)
- true
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def killAppMaster(appId: Int): Boolean = {
- killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
- }
-
- def killExecutor(appId: Int, executorId: Int): Boolean = try {
- val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
- val pid = jvmInfo(0).toInt
- val hostname = jvmInfo(1)
- Docker.killProcess(hostname, pid)
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def killApp(appId: Int): Boolean = try {
- val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
- resp.contains("\"status\":\"success\"")
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def restartApp(appId: Int): Boolean = try {
- val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
- decodeAs[Status](resp).success
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- private val CRUD_POST = "-X POST"
- private val CRUD_DELETE = "-X DELETE"
-
- private def callApi(endpoint: String, option: String = ""): String = {
- curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
- }
-
- private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
- Docker.curl(host, s"http://$host:$port/$endpoint", options)
- }
-
- def login(): Unit = {
- curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
- "--data username=admin", "--data password=admin"))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
deleted file mode 100644
index 79adfc4..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import scala.util.Random
-
-import backtype.storm.utils.{DRPCClient, Utils}
-
-import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-class StormClient(cluster: MiniCluster, restClient: RestClient) {
-
- private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
- private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
- private val DRPC_HOST = "storm0"
- private val DRPC_PORT = 3772
- private val DRPC_INVOCATIONS_PORT = 3773
- private val STORM_DRPC = "storm-drpc"
- private val NIMBUS_HOST = "storm1"
- private val STORM_NIMBUS = "storm nimbus"
- private val STORM_APP = "/opt/start storm app"
-
- private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
- tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
-
- private val nimbusContainer =
- new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
-
- def start(): Unit = {
- nimbusContainer.createAndStart()
- ensureNimbusRunning()
-
- drpcContainer.createAndStart()
- ensureDrpcServerRunning()
- }
-
- private def ensureNimbusRunning(): Unit = {
- Util.retryUntil(() => {
- val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
- // Parse format nimbus.thrift.port: '39322'
- val thriftPort = response.split(" ")(1).replace("'", "").toInt
-
- Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
- }, "Nimbus running")
- }
-
- private def ensureDrpcServerRunning(): Unit = {
- Util.retryUntil(() => {
- Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
- }, "DRPC running")
- }
-
- def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
- Util.retryUntil(() => {
- Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
- s"-jar $jar $mainClass $args")
- restClient.listRunningApps().exists(_.appName == appName)
- }, "app running")
- restClient.listRunningApps().filter(_.appName == appName).head.appId
- }
-
- def getDRPCClient(drpcServerIp: String): DRPCClient = {
- val config = Utils.readDefaultConfig()
- new DRPCClient(config, drpcServerIp, DRPC_PORT)
- }
-
- def shutDown(): Unit = {
-
- // Cleans up the storm.yaml config file
- Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
- drpcContainer.killAndRemove()
- nimbusContainer.killAndRemove()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
new file mode 100644
index 0000000..f315ad3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.log4j.Logger
+
+/**
+ * The class is used to execute Docker commands.
+ */
+object Docker {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ /**
+ * @throws RuntimeException in case retval != 0
+ */
+ private def doExecute(container: String, command: String): String = {
+ ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container")
+ }
+
+ private def doExecuteSilently(container: String, command: String): Boolean = {
+ ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
+ }
+
+ /**
+ * @throws RuntimeException in case retval != 0
+ */
+ final def execute(container: String, command: String): String = {
+ trace(container, s"Execute $command") {
+ doExecute(container, command)
+ }
+ }
+
+ final def executeSilently(container: String, command: String): Boolean = {
+ trace(container, s"Execute silently $command") {
+ doExecuteSilently(container, command)
+ }
+ }
+
+ final def listContainers(): Seq[String] = {
+ trace("", s"Listing how many containers...") {
+ ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
+ .split("\n").filter(_.nonEmpty)
+ }
+ }
+
+ final def containerIsRunning(name: String): Boolean = {
+ trace(name, s"Check container running or not...") {
+ ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty
+ }
+ }
+
+ final def getContainerIPAddr(name: String): String = {
+ trace(name, s"Get Ip Address") {
+ Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
+ }
+ }
+
+ final def containerExists(name: String): Boolean = {
+ trace(name, s"Check container existing or not...") {
+ ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty
+ }
+ }
+
+ /**
+ * @throws RuntimeException in case particular container is created already
+ */
+ final def createAndStartContainer(name: String, image: String, command: String,
+ environ: Map[String, String] = Map.empty, // key, value
+ volumes: Map[String, String] = Map.empty, // from, to
+ knownHosts: Set[String] = Set.empty,
+ tunnelPorts: Set[Int] = Set.empty): String = {
+
+ if (containerExists(name)) {
+ killAndRemoveContainer(name)
+ }
+
+ trace(name, s"Create and start $name ($image)...") {
+
+ val optsBuilder = new StringBuilder
+ optsBuilder.append("-d") // run in background
+ optsBuilder.append(" -h " + name) // use container name as hostname
+ optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings
+
+ environ.foreach { case (key, value) =>
+ optsBuilder.append(s" -e $key=$value")
+ }
+ volumes.foreach { case (from, to) =>
+ optsBuilder.append(s" -v $from:$to")
+ }
+ knownHosts.foreach(host =>
+ optsBuilder.append(" --link " + host)
+ )
+ tunnelPorts.foreach(port =>
+ optsBuilder.append(s" -p $port:$port")
+ )
+ createAndStartContainer(name, optsBuilder.toString(), command, image)
+ }
+ }
+
+ /**
+ * @throws RuntimeException in case particular container is created already
+ */
+ private def createAndStartContainer(
+ name: String, options: String, command: String, image: String): String = {
+ ShellExec.execAndCaptureOutput(s"docker run $options " +
+ s"--name $name $image $command", s"MAKE $name")
+ }
+
+ final def killAndRemoveContainer(name: String): Boolean = {
+ trace(name, s"kill and remove container") {
+ ShellExec.exec(s"docker rm -f $name", s"STOP $name")
+ }
+ }
+
+ final def killAndRemoveContainer(names: Array[String]): Boolean = {
+ assert(names.length > 0)
+ val args = names.mkString(" ")
+ trace(names.mkString(","), s"kill and remove containers") {
+ ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
+ }
+ }
+
+ private def inspect(container: String, option: String): String = {
+ ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container")
+ }
+
+ final def curl(container: String, url: String, options: Array[String] = Array.empty[String])
+ : String = {
+ trace(container, s"curl $url") {
+ doExecute(container, s"curl -s ${options.mkString(" ")} $url")
+ }
+ }
+
+ final def getHostName(container: String): String = {
+ trace(container, s"Get hostname of container...") {
+ doExecute(container, "hostname")
+ }
+ }
+
+ final def getNetworkGateway(container: String): String = {
+ trace(container, s"Get gateway of container...") {
+ doExecute(container, "ip route").split("\\s+")(2)
+ }
+ }
+ final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = {
+ trace(container, s"Kill process pid: $pid") {
+ doExecuteSilently(container, s"kill -$signal $pid")
+ }
+ }
+
+ final def findJars(container: String, folder: String): Array[String] = {
+ trace(container, s"Find jars under $folder") {
+ doExecute(container, s"find $folder")
+ .split("\n").filter(_.endsWith(".jar"))
+ }
+ }
+
+ private def trace[T](container: String, msg: String)(fun: => T): T = {
+ // scalastyle:off println
+ Console.println() // A empty line to let the output looks better.
+ // scalastyle:on println
+ LOG.debug(s"Container $container====>> $msg")
+ LOG.debug("INPUT==>>")
+ val response = fun
+ LOG.debug("<<==OUTPUT")
+
+ LOG.debug(brief(response))
+
+ LOG.debug(s"<<====Command END. Container $container, $msg \n")
+ response
+ }
+
+ private val PREVIEW_MAX_LENGTH = 1024
+
+ private def brief[T](input: T): String = {
+ val output = input match {
+ case true =>
+ "Success|True"
+ case false =>
+ "Failure|False"
+ case x: Array[Any] =>
+ "Success: [" + x.mkString(",") + "]"
+ case x =>
+ x.toString
+ }
+
+ val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+ output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+ }
+ else {
+ output
+ }
+ preview
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
new file mode 100644
index 0000000..25d7ee3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.sys.process._
+
+import org.apache.log4j.Logger
+import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
+
+/**
+ * The class is used to execute command in a shell
+ */
+object ShellExec {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val PROCESS_TIMEOUT = 2.minutes
+
+ /**
+ * The builtin command line parser by ProcessBuilder (implicit sys.process) don't
+ * respect the quote chars (' and ")
+ */
+ private def splitQuotedString(str: String): List[String] = {
+ val splitter = new QuotedStringTokenizer(str, " \t\n\r")
+ splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
+ }
+
+ def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = {
+ LOG.debug(s"$sender => `$command`")
+
+ val p = splitQuotedString(command).run()
+ val f = Future(blocking(p.exitValue())) // wrap in Future
+ val retval = {
+ try {
+ Await.result(f, timeout)
+ } catch {
+ case _: TimeoutException =>
+ LOG.error(s"timeout to execute command `$command`")
+ p.destroy()
+ p.exitValue()
+ }
+ }
+ LOG.debug(s"$sender <= exit $retval")
+ retval == 0
+ }
+
+ def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT)
+ : String = {
+ LOG.debug(s"$sender => `$command`")
+
+ val buf = new StringBuilder
+ val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"),
+ (e: String) => buf.append(e).append("\n"))
+ val p = splitQuotedString(command).run(processLogger)
+ val f = Future(blocking(p.exitValue())) // wrap in Future
+ val retval = {
+ try {
+ Await.result(f, timeout)
+ } catch {
+ case _: TimeoutException =>
+ p.destroy()
+ p.exitValue()
+ }
+ }
+ val output = buf.toString().trim
+ val PREVIEW_MAX_LENGTH = 200
+ val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+ output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+ } else {
+ output
+ }
+
+ LOG.debug(s"$sender <= `$preview` exit $retval")
+ if (retval != 0) {
+ throw new RuntimeException(
+ s"exited ($retval) by executing `$command`")
+ }
+ output
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
new file mode 100644
index 0000000..7e7085d
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.log4j.Logger
+
+object Util {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ def encodeUriComponent(s: String): String = {
+ try {
+ java.net.URLEncoder.encode(s, "UTF-8")
+ .replaceAll("\\+", "%20")
+ .replaceAll("\\%21", "!")
+ .replaceAll("\\%27", "'")
+ .replaceAll("\\%28", "(")
+ .replaceAll("\\%29", ")")
+ .replaceAll("\\%7E", "~")
+ } catch {
+ case ex: Throwable => s
+ }
+ }
+
+ def retryUntil(
+ condition: () => Boolean, conditionDescription: String, maxTries: Int = 15,
+ interval: Duration = 10.seconds): Unit = {
+ var met = false
+ var tries = 0
+
+ while (!met && tries < maxTries) {
+
+ met = Try(condition()) match {
+ case Success(true) => true
+ case Success(false) => false
+ case Failure(ex) => false
+ }
+
+ tries += 1
+
+ if (!met) {
+ LOG.error(s"Failed due to (false == $conditionDescription), " +
+ s"retrying for the ${tries} times...")
+ Thread.sleep(interval.toMillis)
+ } else {
+ LOG.info(s"Success ($conditionDescription) after ${tries} retries")
+ }
+ }
+
+ if (!met) {
+ throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
new file mode 100644
index 0000000..f836abd
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.hadoop
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object HadoopCluster {
+
+ /** Starts a Hadoop cluster */
+ def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
+ val hadoopCluster = new HadoopCluster
+ try {
+ hadoopCluster.start()
+ testCode(hadoopCluster)
+ } finally {
+ hadoopCluster.shutDown()
+ }
+ }
+}
+/**
+ * This class maintains a single node Hadoop cluster
+ */
+class HadoopCluster {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
+ private val HADOOP_HOST = "hadoop0"
+
+ def start(): Unit = {
+ Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
+
+ Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
+ LOG.info("Hadoop cluster is started.")
+ }
+
+ // Checks whether the cluster is alive by listing "/"
+ private def isAlive: Boolean = {
+ Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /")
+ }
+
+ def getDefaultFS: String = {
+ val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
+ s"hdfs://$hostIPAddr:9000"
+ }
+
+ def shutDown(): Unit = {
+ Docker.killAndRemoveContainer(HADOOP_HOST)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
new file mode 100644
index 0000000..15ba084
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object KafkaCluster {
+
+ /** Starts a Kafka cluster */
+ def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
+ val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
+ try {
+ kafkaCluster.start()
+ testCode(kafkaCluster)
+ } finally {
+ kafkaCluster.shutDown()
+ }
+ }
+
+ def withDataProducer(topic: String, brokerList: String)
+ (testCode: NumericalDataProducer => Unit): Unit = {
+ val producer = new NumericalDataProducer(topic, brokerList)
+ try {
+ producer.start()
+ testCode(producer)
+ } finally {
+ producer.stop()
+ }
+ }
+}
+
+/**
+ * This class maintains a single node Kafka cluster with integrated Zookeeper.
+ */
+class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
+ private val KAFKA_HOST = "kafka0"
+ private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
+ private val ZOOKEEPER_PORT = 2181
+ private val BROKER_PORT = 9092
+ val advertisedPort = BROKER_PORT
+
+ def start(): Unit = {
+ Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
+ environ = Map(
+ "ADVERTISED_HOST" -> advertisedHost,
+ "ADVERTISED_PORT" -> BROKER_PORT.toString,
+ "ZK_CHROOT" -> zkChroot),
+ tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
+ )
+ Util.retryUntil(() => isAlive, "kafka cluster is alive")
+ LOG.debug("kafka cluster is started.")
+ }
+
+ def isAlive: Boolean = {
+ !listTopics().contains("Connection refused")
+ }
+
+ def shutDown(): Unit = {
+ Docker.killAndRemoveContainer(KAFKA_HOST)
+ }
+
+ private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
+
+ def listTopics(): String = {
+ kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
+ }
+
+ def getZookeeperConnectString: String = {
+ s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
+ }
+
+ def getBrokerListConnectString: String = {
+ s"$hostIPAddr:$BROKER_PORT"
+ }
+
+ def createTopic(topic: String, partitions: Int = 1): Unit = {
+ LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
+
+ Docker.executeSilently(KAFKA_HOST,
+ s"$KAFKA_HOME/bin/kafka-topics.sh" +
+ s" --zookeeper $getZookeeperConnectString" +
+ s" --create --topic $topic --partitions $partitions --replication-factor 1")
+ }
+
+ def produceDataToKafka(topic: String, messageNum: Int): Unit = {
+ Docker.executeSilently(KAFKA_HOST,
+ s"$KAFKA_HOME/bin/kafka-topics.sh" +
+ s" --zookeeper $getZookeeperConnectString" +
+ s" --create --topic $topic --partitions 1 --replication-factor 1")
+
+ Docker.executeSilently(KAFKA_HOST,
+ s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
+ s" --broker-list $getBrokerListConnectString" +
+ s" --topic $topic --messages $messageNum")
+ }
+
+ def getLatestOffset(topic: String): Int = {
+ kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
+ }
+
+ private def kafkaListTopics(
+ container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
+
+ LOG.debug(s"|=> Kafka list topics...")
+ Docker.execute(container,
+ s"$kafkaHome/bin/kafka-topics.sh" +
+ s" --zookeeper $zookeeperConnectionString -list")
+ }
+
+ private def kafkaFetchLatestOffset(
+ container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
+ LOG.debug(s"|=> Get latest offset of topic $topic...")
+ val output = Docker.execute(container,
+ s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
+ s" --broker-list $brokersList " +
+ s" --topic $topic --time -1")
+ output.split(":")(2).toInt
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
new file mode 100644
index 0000000..1cf3125
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class NumericalDataProducer(topic: String, bootstrapServers: String) {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val producer = createProducer
+ private val WRITE_SLEEP_NANOS = 10
+ private val serializer = new ChillSerializer[Int]
+ var lastWriteNum = 0
+
+ def start(): Unit = {
+ produceThread.start()
+ }
+
+ def stop(): Unit = {
+ if (produceThread.isAlive) {
+ produceThread.interrupt()
+ produceThread.join()
+ }
+ producer.close()
+ }
+
+ /** How many message we have written in total */
+ def producedNumbers: Range = {
+ Range(1, lastWriteNum + 1)
+ }
+
+ private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+ val properties = new Properties()
+ properties.setProperty("bootstrap.servers", bootstrapServers)
+ new KafkaProducer[Array[Byte], Array[Byte]](properties,
+ new ByteArraySerializer, new ByteArraySerializer)
+ }
+
+ private val produceThread = new Thread(new Runnable {
+ override def run(): Unit = {
+ try {
+ while (!Thread.currentThread.isInterrupted) {
+ lastWriteNum += 1
+ val msg = serializer.serialize(lastWriteNum)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
+ producer.send(record)
+ Thread.sleep(0, WRITE_SLEEP_NANOS)
+ }
+ } catch {
+ case ex: InterruptedException =>
+ LOG.error("message producing is stopped by an interrupt")
+ }
+ }
+ })
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
new file mode 100644
index 0000000..1f773d3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.collection.mutable
+
+trait ResultVerifier {
+ def onNext(num: Int): Unit
+}
+
+class MessageLossDetector(totalNum: Int) extends ResultVerifier {
+ private val bitSets = new mutable.BitSet(totalNum)
+ var result = List.empty[Int]
+
+ override def onNext(num: Int): Unit = {
+ bitSets.add(num)
+ result :+= num
+ }
+
+ def allReceived: Boolean = {
+ 1.to(totalNum).forall(bitSets)
+ }
+
+ def received(num: Int): Boolean = {
+ bitSets(num)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
new file mode 100644
index 0000000..392ca86
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.util.{Failure, Success}
+
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
+import kafka.utils.Utils
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0,
+ host: String, port: Int) {
+
+ private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
+ private val serializer = new ChillSerializer[Int]
+ private var offset = 0L
+
+ def read(): Unit = {
+ val messageSet = consumer.fetch(
+ new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build()
+ ).messageSet(topic, partition)
+
+ for (messageAndOffset <- messageSet) {
+ serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match {
+ case Success(msg) =>
+ offset = messageAndOffset.nextOffset
+ verifier.onNext(msg)
+ case Failure(e) => throw e
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
new file mode 100644
index 0000000..73413da
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.sys.process._
+
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A helper to instantiate the base image for different usage.
+ */
+class BaseContainer(val host: String, command: String,
+ masterAddrs: List[(String, Int)],
+ tunnelPorts: Set[Int] = Set.empty) {
+
+ private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
+ private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
+ private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
+ private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
+ private val HOST_LOG_HOME = {
+ val dir = "/tmp/gearpump"
+ s"mkdir -p $dir".!!
+ s"mktemp -p $dir -d".!!.trim
+ }
+
+ private val CLUSTER_OPTS = {
+ masterAddrs.zipWithIndex.map { case (hostPort, index) =>
+ s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
+ }.mkString(" ")
+ }
+
+ def createAndStart(): String = {
+ Docker.createAndStartContainer(host, IMAGE_NAME, command,
+ environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
+ volumes = Map(
+ HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
+ HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
+ knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
+ tunnelPorts = tunnelPorts)
+ }
+
+ def killAndRemove(): Unit = {
+ Docker.killAndRemoveContainer(host)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
new file mode 100644
index 0000000..884a8d1
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A command-line client to operate a Gearpump cluster
+ */
+class CommandLineClient(host: String) {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ def listApps(): Array[String] = {
+ gearCommand(host, "gear info").split("\n").filter(
+ _.startsWith("application: ")
+ )
+ }
+
+ def listRunningApps(): Array[String] =
+ listApps().filter(
+ _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
+ )
+
+ def queryApp(appId: Int): String = try {
+ listApps().filter(
+ _.startsWith(s"application: $appId")
+ ).head
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ ""
+ }
+
+ def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
+ gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
+ }
+
+ def submitApp(jar: String, args: String = ""): Int = {
+ LOG.debug(s"|=> Submit Application $jar...")
+ submitAppUse("gear app", jar, args)
+ }
+
+ private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
+ gearCommand(host, s"$launcher -jar $jar $args").split("\n")
+ .filter(_.contains("The application id is ")).head.split(" ").last.toInt
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ -1
+ }
+
+ def killApp(appId: Int): Boolean = {
+ tryGearCommand(host, s"gear kill -appid $appId")
+ }
+
+ private def gearCommand(container: String, command: String): String = {
+ LOG.debug(s"|=> Gear command $command in container $container...")
+ Docker.execute(container, s"/opt/start $command")
+ }
+
+ private def tryGearCommand(container: String, command: String): Boolean = {
+ LOG.debug(s"|=> Gear command $command in container $container...")
+ Docker.executeSilently(container, s"/opt/start $command")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
new file mode 100644
index 0000000..4d439e8
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import java.io.IOException
+import scala.collection.mutable.ListBuffer
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+/**
+ * This class is a test driver for end-to-end integration test.
+ */
+class MiniCluster {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val SUT_HOME = "/opt/gearpump"
+
+ private val REST_SERVICE_PORT = 8090
+ private val MASTER_PORT = 3000
+ private val MASTER_ADDRS: List[(String, Int)] = {
+ (0 to 0).map(index =>
+ ("master" + index, MASTER_PORT)
+ ).toList
+ }
+
+ lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
+
+ lazy val restClient: RestClient = {
+ val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
+ client
+ }
+
+ private var workers: ListBuffer[String] = ListBuffer.empty
+
+ def start(workerNum: Int = 2): Unit = {
+
+ // Kill master
+ MASTER_ADDRS.foreach { case (host, _) =>
+ if (Docker.containerExists(host)) {
+ Docker.killAndRemoveContainer(host)
+ }
+ }
+
+ // Kill existing workers
+ workers ++= (0 until workerNum).map("worker" + _)
+ workers.foreach { worker =>
+ if (Docker.containerExists(worker)) {
+ Docker.killAndRemoveContainer(worker)
+ }
+ }
+
+ // Start Masters
+ MASTER_ADDRS.foreach({ case (host, port) =>
+ addMasterNode(host, port)
+ })
+
+ // Start Workers
+ workers.foreach { worker =>
+ val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
+ container.createAndStart()
+ }
+
+ // Check cluster status
+ expectRestClientAuthenticated()
+ expectClusterAvailable()
+ }
+
+ private def addMasterNode(host: String, port: Int): Unit = {
+ val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
+ container.createAndStart()
+ }
+
+ def addWorkerNode(host: String): Unit = {
+ if (workers.find(_ == host).isEmpty) {
+ val container = new BaseContainer(host, "worker", MASTER_ADDRS)
+ container.createAndStart()
+ workers += host
+ } else {
+ throw new IOException(s"Cannot add new worker $host, " +
+ s"as worker with same hostname already exists")
+ }
+ }
+
+ /**
+ * @throws RuntimeException if rest client is not authenticated after N attempts
+ */
+ private def expectRestClientAuthenticated(): Unit = {
+ Util.retryUntil(() => {
+ restClient.login()
+ LOG.info("rest client has been authenticated")
+ true
+ }, "login successfully")
+ }
+
+ /**
+ * @throws RuntimeException if service is not available after N attempts
+ */
+ private def expectClusterAvailable(): Unit = {
+ Util.retryUntil(() => {
+ val response = restClient.queryMaster()
+ LOG.info(s"cluster is now available with response: $response.")
+ response.aliveFor > 0
+ }, "cluster running")
+ }
+
+ def isAlive: Boolean = {
+ getMasterHosts.exists(nodeIsOnline)
+ }
+
+ def getNetworkGateway: String = {
+ Docker.getNetworkGateway(MASTER_ADDRS.head._1)
+ }
+
+ def shutDown(): Unit = {
+ val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
+ .filter(nodeIsOnline).toArray
+ if (removalHosts.length > 0) {
+ Docker.killAndRemoveContainer(removalHosts)
+ }
+ workers.clear()
+ }
+
+ def removeMasterNode(host: String): Unit = {
+ Docker.killAndRemoveContainer(host)
+ }
+
+ def removeWorkerNode(host: String): Unit = {
+ workers -= host
+ Docker.killAndRemoveContainer(host)
+ }
+
+ def restart(): Unit = {
+ shutDown()
+ Util.retryUntil(() => {
+ !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
+ }, "all docker containers killed")
+ LOG.info("all docker containers have been killed. restarting...")
+ start()
+ }
+
+ def getMastersAddresses: List[(String, Int)] = {
+ MASTER_ADDRS
+ }
+
+ def getMasterHosts: List[String] = {
+ MASTER_ADDRS.map({ case (host, port) => host })
+ }
+
+ def getWorkerHosts: List[String] = {
+ workers.toList
+ }
+
+ def nodeIsOnline(host: String): Boolean = {
+ Docker.containerIsRunning(host)
+ }
+
+ private def builtInJarsUnder(folder: String): Array[String] = {
+ Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
+ }
+
+ private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
+ builtInJarsUnder(folder).filter(_.contains(subtext))
+ }
+
+ def queryBuiltInExampleJars(subtext: String): Seq[String] = {
+ queryBuiltInJars("examples", subtext)
+ }
+
+ def queryBuiltInITJars(subtext: String): Seq[String] = {
+ queryBuiltInJars("integrationtest", subtext)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
new file mode 100644
index 0000000..1b143af
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.reflect.ClassTag
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.log4j.Logger
+import upickle.Js
+import upickle.default._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
+import org.apache.gearpump.cluster.master.MasterSummary
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+import org.apache.gearpump.services.AppMasterService.Status
+import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
+import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
+import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
+import org.apache.gearpump.util.{Constants, Graph}
+
+/**
+ * A REST client to operate a Gearpump cluster
+ */
+class RestClient(host: String, port: Int) {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ private val cookieFile: String = "cookie.txt"
+
+ implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
+ upickle.default.Reader[Graph[Int, String]] {
+ case Js.Obj(verties, edges) =>
+ val vertexList = upickle.default.readJs[List[Int]](verties._2)
+ val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
+ Graph(vertexList, edgeList)
+ }
+
+ private def decodeAs[T](
+ expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
+ read[T](expr)
+ } catch {
+ case ex: Throwable =>
+ LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
+ throw ex
+ }
+
+ def queryVersion(): String = {
+ curl("version")
+ }
+
+ def listWorkers(): Array[WorkerSummary] = {
+ val resp = callApi("master/workerlist")
+ decodeAs[List[WorkerSummary]](resp).toArray
+ }
+
+ def listRunningWorkers(): Array[WorkerSummary] = {
+ listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
+ }
+
+ def listApps(): Array[AppMasterData] = {
+ val resp = callApi("master/applist")
+ decodeAs[AppMastersData](resp).appMasters.toArray
+ }
+
+ def listRunningApps(): Array[AppMasterData] = {
+ listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
+ }
+
+ def getNextAvailableAppId(): Int = {
+ listApps().length + 1
+ }
+
+ def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
+ : Boolean = try {
+ var endpoint = "master/submitapp"
+
+ var options = Seq(s"jar=@$jar")
+ if (config.length > 0) {
+ options :+= s"conf=@$config"
+ }
+
+ options :+= s"executorcount=$executorNum"
+
+ if (args != null && !args.isEmpty) {
+ options :+= "args=\"" + args + "\""
+ }
+
+ val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
+ val result = decodeAs[AppSubmissionResult](resp)
+ assert(result.success)
+ true
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def queryApp(appId: Int): AppMasterData = {
+ val resp = callApi(s"appmaster/$appId")
+ decodeAs[AppMasterData](resp)
+ }
+
+ def queryAppMasterConfig(appId: Int): Config = {
+ val resp = callApi(s"appmaster/$appId/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
+ val resp = callApi(s"appmaster/$appId?detail=true")
+ decodeAs[StreamAppMasterSummary](resp)
+ }
+
+ def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
+ : HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
+ val resp = callApi(s"appmaster/$appId/executor/$executorId")
+ decodeAs[ExecutorSummary](resp)
+ }
+
+ def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
+ queryStreamingAppDetail(appId).executors.toArray
+ }
+
+ def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryExecutorConfig(appId: Int, executorId: Int): Config = {
+ val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryMaster(): MasterSummary = {
+ val resp = callApi("master")
+ decodeAs[MasterData](resp).masterDescription
+ }
+
+ def queryMasterMetrics(current: Boolean): HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val resp = callApi(s"master/metrics/master?$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryMasterConfig(): Config = {
+ val resp = callApi("master/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val workerIdStr = WorkerId.render(workerId)
+ val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryWorkerConfig(workerId: WorkerId): Config = {
+ val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryBuiltInPartitioners(): Array[String] = {
+ val resp = callApi("master/partitioners")
+ decodeAs[BuiltinPartitioners](resp).partitioners
+ }
+
+ def uploadJar(localFilePath: String): AppJar = {
+ val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
+ decodeAs[AppJar](resp)
+ }
+
+ def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
+ replaceStreamingAppProcessor(appId, replaceMe, false)
+ }
+
+ def replaceStreamingAppProcessor(
+ appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
+ val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
+ val args = upickle.default.write(replaceOperation)
+ val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
+ CRUD_POST)
+ decodeAs[DAGOperationResult](resp)
+ true
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def killAppMaster(appId: Int): Boolean = {
+ killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+ }
+
+ def killExecutor(appId: Int, executorId: Int): Boolean = try {
+ val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
+ val pid = jvmInfo(0).toInt
+ val hostname = jvmInfo(1)
+ Docker.killProcess(hostname, pid)
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def killApp(appId: Int): Boolean = try {
+ val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
+ resp.contains("\"status\":\"success\"")
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def restartApp(appId: Int): Boolean = try {
+ val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
+ decodeAs[Status](resp).success
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ private val CRUD_POST = "-X POST"
+ private val CRUD_DELETE = "-X DELETE"
+
+ private def callApi(endpoint: String, option: String = ""): String = {
+ curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
+ }
+
+ private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
+ Docker.curl(host, s"http://$host:$port/$endpoint", options)
+ }
+
+ def login(): Unit = {
+ curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
+ "--data username=admin", "--data password=admin"))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
new file mode 100644
index 0000000..79adfc4
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import scala.util.Random
+
+import backtype.storm.utils.{DRPCClient, Utils}
+
+import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+class StormClient(cluster: MiniCluster, restClient: RestClient) {
+
+ private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
+ private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
+ private val DRPC_HOST = "storm0"
+ private val DRPC_PORT = 3772
+ private val DRPC_INVOCATIONS_PORT = 3773
+ private val STORM_DRPC = "storm-drpc"
+ private val NIMBUS_HOST = "storm1"
+ private val STORM_NIMBUS = "storm nimbus"
+ private val STORM_APP = "/opt/start storm app"
+
+ private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
+ tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+
+ private val nimbusContainer =
+ new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
+
+ def start(): Unit = {
+ nimbusContainer.createAndStart()
+ ensureNimbusRunning()
+
+ drpcContainer.createAndStart()
+ ensureDrpcServerRunning()
+ }
+
+ private def ensureNimbusRunning(): Unit = {
+ Util.retryUntil(() => {
+ val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
+ // Parse format nimbus.thrift.port: '39322'
+ val thriftPort = response.split(" ")(1).replace("'", "").toInt
+
+ Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
+ }, "Nimbus running")
+ }
+
+ private def ensureDrpcServerRunning(): Unit = {
+ Util.retryUntil(() => {
+ Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
+ }, "DRPC running")
+ }
+
+ def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
+ Util.retryUntil(() => {
+ Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+ s"-jar $jar $mainClass $args")
+ restClient.listRunningApps().exists(_.appName == appName)
+ }, "app running")
+ restClient.listRunningApps().filter(_.appName == appName).head.appId
+ }
+
+ def getDRPCClient(drpcServerIp: String): DRPCClient = {
+ val config = Utils.readDefaultConfig()
+ new DRPCClient(config, drpcServerIp, DRPC_PORT)
+ }
+
+ def shutDown(): Unit = {
+
+ // Cleans up the storm.yaml config file
+ Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
+ drpcContainer.killAndRemove()
+ nimbusContainer.killAndRemove()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
- private var id = 0L
-
- override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
- val bytes = tuple.getBinary(0)
- collector.emit(new Values(s"$id".getBytes, bytes))
- id += 1
- }
-
- override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
- declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
- FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
- }
-}