You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/27 11:38:49 UTC

[2/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the integration test file structure

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
deleted file mode 100644
index 73413da..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.sys.process._
-
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A helper to instantiate the base image for different usage.
- */
-class BaseContainer(val host: String, command: String,
-    masterAddrs: List[(String, Int)],
-    tunnelPorts: Set[Int] = Set.empty) {
-
-  private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
-  private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
-  private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
-  private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
-  private val HOST_LOG_HOME = {
-    val dir = "/tmp/gearpump"
-    s"mkdir -p $dir".!!
-    s"mktemp -p $dir -d".!!.trim
-  }
-
-  private val CLUSTER_OPTS = {
-    masterAddrs.zipWithIndex.map { case (hostPort, index) =>
-      s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
-    }.mkString(" ")
-  }
-
-  def createAndStart(): String = {
-    Docker.createAndStartContainer(host, IMAGE_NAME, command,
-      environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
-      volumes = Map(
-        HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
-        HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
-      knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
-      tunnelPorts = tunnelPorts)
-  }
-
-  def killAndRemove(): Unit = {
-    Docker.killAndRemoveContainer(host)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
deleted file mode 100644
index 884a8d1..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A command-line client to operate a Gearpump cluster
- */
-class CommandLineClient(host: String) {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  def listApps(): Array[String] = {
-    gearCommand(host, "gear info").split("\n").filter(
-      _.startsWith("application: ")
-    )
-  }
-
-  def listRunningApps(): Array[String] =
-    listApps().filter(
-      _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
-    )
-
-  def queryApp(appId: Int): String = try {
-    listApps().filter(
-      _.startsWith(s"application: $appId")
-    ).head
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      ""
-  }
-
-  def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
-    gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
-  }
-
-  def submitApp(jar: String, args: String = ""): Int = {
-    LOG.debug(s"|=> Submit Application $jar...")
-    submitAppUse("gear app", jar, args)
-  }
-
-  private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
-    gearCommand(host, s"$launcher -jar $jar $args").split("\n")
-      .filter(_.contains("The application id is ")).head.split(" ").last.toInt
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      -1
-  }
-
-  def killApp(appId: Int): Boolean = {
-    tryGearCommand(host, s"gear kill -appid $appId")
-  }
-
-  private def gearCommand(container: String, command: String): String = {
-    LOG.debug(s"|=> Gear command $command in container $container...")
-    Docker.execute(container, s"/opt/start $command")
-  }
-
-  private def tryGearCommand(container: String, command: String): Boolean = {
-    LOG.debug(s"|=> Gear command $command in container $container...")
-    Docker.executeSilently(container, s"/opt/start $command")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
deleted file mode 100644
index 4d439e8..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import java.io.IOException
-import scala.collection.mutable.ListBuffer
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-/**
- * This class is a test driver for end-to-end integration test.
- */
-class MiniCluster {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val SUT_HOME = "/opt/gearpump"
-
-  private val REST_SERVICE_PORT = 8090
-  private val MASTER_PORT = 3000
-  private val MASTER_ADDRS: List[(String, Int)] = {
-    (0 to 0).map(index =>
-      ("master" + index, MASTER_PORT)
-    ).toList
-  }
-
-  lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
-
-  lazy val restClient: RestClient = {
-    val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
-    client
-  }
-
-  private var workers: ListBuffer[String] = ListBuffer.empty
-
-  def start(workerNum: Int = 2): Unit = {
-
-    // Kill master
-    MASTER_ADDRS.foreach { case (host, _) =>
-      if (Docker.containerExists(host)) {
-        Docker.killAndRemoveContainer(host)
-      }
-    }
-
-    // Kill existing workers
-    workers ++= (0 until workerNum).map("worker" + _)
-    workers.foreach { worker =>
-      if (Docker.containerExists(worker)) {
-        Docker.killAndRemoveContainer(worker)
-      }
-    }
-
-    // Start Masters
-    MASTER_ADDRS.foreach({ case (host, port) =>
-      addMasterNode(host, port)
-    })
-
-    // Start Workers
-    workers.foreach { worker =>
-      val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
-      container.createAndStart()
-    }
-
-    // Check cluster status
-    expectRestClientAuthenticated()
-    expectClusterAvailable()
-  }
-
-  private def addMasterNode(host: String, port: Int): Unit = {
-    val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
-    container.createAndStart()
-  }
-
-  def addWorkerNode(host: String): Unit = {
-    if (workers.find(_ == host).isEmpty) {
-      val container = new BaseContainer(host, "worker", MASTER_ADDRS)
-      container.createAndStart()
-      workers += host
-    } else {
-      throw new IOException(s"Cannot add new worker $host, " +
-        s"as worker with same hostname already exists")
-    }
-  }
-
-  /**
-   * @throws RuntimeException if rest client is not authenticated after N attempts
-   */
-  private def expectRestClientAuthenticated(): Unit = {
-    Util.retryUntil(() => {
-      restClient.login()
-      LOG.info("rest client has been authenticated")
-      true
-    }, "login successfully")
-  }
-
-  /**
-   * @throws RuntimeException if service is not available after N attempts
-   */
-  private def expectClusterAvailable(): Unit = {
-    Util.retryUntil(() => {
-      val response = restClient.queryMaster()
-      LOG.info(s"cluster is now available with response: $response.")
-      response.aliveFor > 0
-    }, "cluster running")
-  }
-
-  def isAlive: Boolean = {
-    getMasterHosts.exists(nodeIsOnline)
-  }
-
-  def getNetworkGateway: String = {
-    Docker.getNetworkGateway(MASTER_ADDRS.head._1)
-  }
-
-  def shutDown(): Unit = {
-    val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
-      .filter(nodeIsOnline).toArray
-    if (removalHosts.length > 0) {
-      Docker.killAndRemoveContainer(removalHosts)
-    }
-    workers.clear()
-  }
-
-  def removeMasterNode(host: String): Unit = {
-    Docker.killAndRemoveContainer(host)
-  }
-
-  def removeWorkerNode(host: String): Unit = {
-    workers -= host
-    Docker.killAndRemoveContainer(host)
-  }
-
-  def restart(): Unit = {
-    shutDown()
-    Util.retryUntil(() => {
-      !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
-    }, "all docker containers killed")
-    LOG.info("all docker containers have been killed. restarting...")
-    start()
-  }
-
-  def getMastersAddresses: List[(String, Int)] = {
-    MASTER_ADDRS
-  }
-
-  def getMasterHosts: List[String] = {
-    MASTER_ADDRS.map({ case (host, port) => host })
-  }
-
-  def getWorkerHosts: List[String] = {
-    workers.toList
-  }
-
-  def nodeIsOnline(host: String): Boolean = {
-    Docker.containerIsRunning(host)
-  }
-
-  private def builtInJarsUnder(folder: String): Array[String] = {
-    Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
-  }
-
-  private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
-    builtInJarsUnder(folder).filter(_.contains(subtext))
-  }
-
-  def queryBuiltInExampleJars(subtext: String): Seq[String] = {
-    queryBuiltInJars("examples", subtext)
-  }
-
-  def queryBuiltInITJars(subtext: String): Seq[String] = {
-    queryBuiltInJars("integrationtest", subtext)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
deleted file mode 100644
index 1b143af..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.reflect.ClassTag
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.log4j.Logger
-import upickle.Js
-import upickle.default._
-
-import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
-import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
-import org.apache.gearpump.cluster.master.MasterSummary
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-import org.apache.gearpump.services.AppMasterService.Status
-import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
-// NOTE: This cannot be removed!!!
-import org.apache.gearpump.services.util.UpickleUtil._
-import org.apache.gearpump.streaming.ProcessorDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
-import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
-import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
-import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
-import org.apache.gearpump.util.{Constants, Graph}
-
-/**
- * A REST client to operate a Gearpump cluster
- */
-class RestClient(host: String, port: Int) {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  private val cookieFile: String = "cookie.txt"
-
-  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
-    upickle.default.Reader[Graph[Int, String]] {
-    case Js.Obj(verties, edges) =>
-      val vertexList = upickle.default.readJs[List[Int]](verties._2)
-      val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
-      Graph(vertexList, edgeList)
-  }
-
-  private def decodeAs[T](
-      expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
-    read[T](expr)
-  } catch {
-    case ex: Throwable =>
-      LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
-      throw ex
-  }
-
-  def queryVersion(): String = {
-    curl("version")
-  }
-
-  def listWorkers(): Array[WorkerSummary] = {
-    val resp = callApi("master/workerlist")
-    decodeAs[List[WorkerSummary]](resp).toArray
-  }
-
-  def listRunningWorkers(): Array[WorkerSummary] = {
-    listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
-  }
-
-  def listApps(): Array[AppMasterData] = {
-    val resp = callApi("master/applist")
-    decodeAs[AppMastersData](resp).appMasters.toArray
-  }
-
-  def listRunningApps(): Array[AppMasterData] = {
-    listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
-  }
-
-  def getNextAvailableAppId(): Int = {
-    listApps().length + 1
-  }
-
-  def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
-    : Boolean = try {
-    var endpoint = "master/submitapp"
-
-    var options = Seq(s"jar=@$jar")
-    if (config.length > 0) {
-      options :+= s"conf=@$config"
-    }
-
-    options :+= s"executorcount=$executorNum"
-
-    if (args != null && !args.isEmpty) {
-      options :+= "args=\"" + args + "\""
-    }
-
-    val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
-    val result = decodeAs[AppSubmissionResult](resp)
-    assert(result.success)
-    true
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def queryApp(appId: Int): AppMasterData = {
-    val resp = callApi(s"appmaster/$appId")
-    decodeAs[AppMasterData](resp)
-  }
-
-  def queryAppMasterConfig(appId: Int): Config = {
-    val resp = callApi(s"appmaster/$appId/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
-    val resp = callApi(s"appmaster/$appId?detail=true")
-    decodeAs[StreamAppMasterSummary](resp)
-  }
-
-  def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
-    : HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
-    val resp = callApi(s"appmaster/$appId/executor/$executorId")
-    decodeAs[ExecutorSummary](resp)
-  }
-
-  def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
-    queryStreamingAppDetail(appId).executors.toArray
-  }
-
-  def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryExecutorConfig(appId: Int, executorId: Int): Config = {
-    val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryMaster(): MasterSummary = {
-    val resp = callApi("master")
-    decodeAs[MasterData](resp).masterDescription
-  }
-
-  def queryMasterMetrics(current: Boolean): HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"master/metrics/master?$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryMasterConfig(): Config = {
-    val resp = callApi("master/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val workerIdStr = WorkerId.render(workerId)
-    val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryWorkerConfig(workerId: WorkerId): Config = {
-    val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryBuiltInPartitioners(): Array[String] = {
-    val resp = callApi("master/partitioners")
-    decodeAs[BuiltinPartitioners](resp).partitioners
-  }
-
-  def uploadJar(localFilePath: String): AppJar = {
-    val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
-    decodeAs[AppJar](resp)
-  }
-
-  def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
-    replaceStreamingAppProcessor(appId, replaceMe, false)
-  }
-
-  def replaceStreamingAppProcessor(
-      appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
-    val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
-    val args = upickle.default.write(replaceOperation)
-    val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
-      CRUD_POST)
-    decodeAs[DAGOperationResult](resp)
-    true
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def killAppMaster(appId: Int): Boolean = {
-    killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-  }
-
-  def killExecutor(appId: Int, executorId: Int): Boolean = try {
-    val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
-    val pid = jvmInfo(0).toInt
-    val hostname = jvmInfo(1)
-    Docker.killProcess(hostname, pid)
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def killApp(appId: Int): Boolean = try {
-    val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
-    resp.contains("\"status\":\"success\"")
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def restartApp(appId: Int): Boolean = try {
-    val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
-    decodeAs[Status](resp).success
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  private val CRUD_POST = "-X POST"
-  private val CRUD_DELETE = "-X DELETE"
-
-  private def callApi(endpoint: String, option: String = ""): String = {
-    curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
-  }
-
-  private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
-    Docker.curl(host, s"http://$host:$port/$endpoint", options)
-  }
-
-  def login(): Unit = {
-    curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
-      "--data username=admin", "--data password=admin"))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
deleted file mode 100644
index 79adfc4..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import scala.util.Random
-
-import backtype.storm.utils.{DRPCClient, Utils}
-
-import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-class StormClient(cluster: MiniCluster, restClient: RestClient) {
-
-  private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
-  private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
-  private val DRPC_HOST = "storm0"
-  private val DRPC_PORT = 3772
-  private val DRPC_INVOCATIONS_PORT = 3773
-  private val STORM_DRPC = "storm-drpc"
-  private val NIMBUS_HOST = "storm1"
-  private val STORM_NIMBUS = "storm nimbus"
-  private val STORM_APP = "/opt/start storm app"
-
-  private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
-    tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
-
-  private val nimbusContainer =
-    new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
-
-  def start(): Unit = {
-    nimbusContainer.createAndStart()
-    ensureNimbusRunning()
-
-    drpcContainer.createAndStart()
-    ensureDrpcServerRunning()
-  }
-
-  private def ensureNimbusRunning(): Unit = {
-    Util.retryUntil(() => {
-      val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
-      // Parse format nimbus.thrift.port: '39322'
-      val thriftPort = response.split(" ")(1).replace("'", "").toInt
-
-      Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
-    }, "Nimbus running")
-  }
-
-  private def ensureDrpcServerRunning(): Unit = {
-    Util.retryUntil(() => {
-      Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
-    }, "DRPC running")
-  }
-
-  def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
-    Util.retryUntil(() => {
-      Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
-        s"-jar $jar $mainClass $args")
-      restClient.listRunningApps().exists(_.appName == appName)
-    }, "app running")
-    restClient.listRunningApps().filter(_.appName == appName).head.appId
-  }
-
-  def getDRPCClient(drpcServerIp: String): DRPCClient = {
-    val config = Utils.readDefaultConfig()
-    new DRPCClient(config, drpcServerIp, DRPC_PORT)
-  }
-
-  def shutDown(): Unit = {
-
-    // Cleans up the storm.yaml config file
-    Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
-    drpcContainer.killAndRemove()
-    nimbusContainer.killAndRemove()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
new file mode 100644
index 0000000..f315ad3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.log4j.Logger
+
+/**
+ * The class is used to execute Docker commands.
+ */
+object Docker {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  /**
+   * @throws RuntimeException in case retval != 0
+   */
+  private def doExecute(container: String, command: String): String = {
+    ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container")
+  }
+
+  private def doExecuteSilently(container: String, command: String): Boolean = {
+    ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
+  }
+
+  /**
+   * @throws RuntimeException in case retval != 0
+   */
+  final def execute(container: String, command: String): String = {
+    trace(container, s"Execute $command") {
+      doExecute(container, command)
+    }
+  }
+
+  final def executeSilently(container: String, command: String): Boolean = {
+    trace(container, s"Execute silently $command") {
+      doExecuteSilently(container, command)
+    }
+  }
+
+  final def listContainers(): Seq[String] = {
+    trace("", s"Listing how many containers...") {
+      ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
+        .split("\n").filter(_.nonEmpty)
+    }
+  }
+
+  final def containerIsRunning(name: String): Boolean = {
+    trace(name, s"Check container running or not...") {
+      ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty
+    }
+  }
+
+  final def getContainerIPAddr(name: String): String = {
+    trace(name, s"Get Ip Address") {
+      Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
+    }
+  }
+
+  final def containerExists(name: String): Boolean = {
+    trace(name, s"Check container existing or not...") {
+      ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty
+    }
+  }
+
+  /**
+   * @throws RuntimeException in case particular container is created already
+   */
+  final def createAndStartContainer(name: String, image: String, command: String,
+      environ: Map[String, String] = Map.empty, // key, value
+      volumes: Map[String, String] = Map.empty, // from, to
+      knownHosts: Set[String] = Set.empty,
+      tunnelPorts: Set[Int] = Set.empty): String = {
+
+    if (containerExists(name)) {
+      killAndRemoveContainer(name)
+    }
+
+    trace(name, s"Create and start $name ($image)...") {
+
+      val optsBuilder = new StringBuilder
+      optsBuilder.append("-d") // run in background
+      optsBuilder.append(" -h " + name) // use container name as hostname
+      optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings
+
+      environ.foreach { case (key, value) =>
+        optsBuilder.append(s" -e $key=$value")
+      }
+      volumes.foreach { case (from, to) =>
+        optsBuilder.append(s" -v $from:$to")
+      }
+      knownHosts.foreach(host =>
+        optsBuilder.append(" --link " + host)
+      )
+      tunnelPorts.foreach(port =>
+        optsBuilder.append(s" -p $port:$port")
+      )
+      createAndStartContainer(name, optsBuilder.toString(), command, image)
+    }
+  }
+
+  /**
+   * @throws RuntimeException in case particular container is created already
+   */
+  private def createAndStartContainer(
+      name: String, options: String, command: String, image: String): String = {
+    ShellExec.execAndCaptureOutput(s"docker run $options " +
+      s"--name $name $image $command", s"MAKE $name")
+  }
+
+  final def killAndRemoveContainer(name: String): Boolean = {
+    trace(name, s"kill and remove container") {
+      ShellExec.exec(s"docker rm -f $name", s"STOP $name")
+    }
+  }
+
+  final def killAndRemoveContainer(names: Array[String]): Boolean = {
+    assert(names.length > 0)
+    val args = names.mkString(" ")
+    trace(names.mkString(","), s"kill and remove containers") {
+      ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
+    }
+  }
+
+  private def inspect(container: String, option: String): String = {
+    ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container")
+  }
+
+  final def curl(container: String, url: String, options: Array[String] = Array.empty[String])
+    : String = {
+    trace(container, s"curl $url") {
+      doExecute(container, s"curl -s ${options.mkString(" ")} $url")
+    }
+  }
+
+  final def getHostName(container: String): String = {
+    trace(container, s"Get hostname of container...") {
+      doExecute(container, "hostname")
+    }
+  }
+
+  final def getNetworkGateway(container: String): String = {
+    trace(container, s"Get gateway of container...") {
+      doExecute(container, "ip route").split("\\s+")(2)
+    }
+  }
+  final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = {
+    trace(container, s"Kill process pid: $pid") {
+      doExecuteSilently(container, s"kill -$signal $pid")
+    }
+  }
+
+  final def findJars(container: String, folder: String): Array[String] = {
+    trace(container, s"Find jars under $folder") {
+      doExecute(container, s"find $folder")
+        .split("\n").filter(_.endsWith(".jar"))
+    }
+  }
+
+  private def trace[T](container: String, msg: String)(fun: => T): T = {
+    // scalastyle:off println
+    Console.println() // A empty line to let the output looks better.
+    // scalastyle:on println
+    LOG.debug(s"Container $container====>> $msg")
+    LOG.debug("INPUT==>>")
+    val response = fun
+    LOG.debug("<<==OUTPUT")
+
+    LOG.debug(brief(response))
+
+    LOG.debug(s"<<====Command END. Container $container, $msg \n")
+    response
+  }
+
+  private val PREVIEW_MAX_LENGTH = 1024
+
+  private def brief[T](input: T): String = {
+    val output = input match {
+      case true =>
+        "Success|True"
+      case false =>
+        "Failure|False"
+      case x: Array[Any] =>
+        "Success: [" + x.mkString(",") + "]"
+      case x =>
+        x.toString
+    }
+
+    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+    }
+    else {
+      output
+    }
+    preview
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
new file mode 100644
index 0000000..25d7ee3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.sys.process._
+
+import org.apache.log4j.Logger
+import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
+
+/**
+ * The class is used to execute command in a shell
+ */
+object ShellExec {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val PROCESS_TIMEOUT = 2.minutes
+
+  /**
+   * The builtin command line parser by ProcessBuilder (implicit sys.process) don't
+   * respect the quote chars (' and ")
+   */
+  private def splitQuotedString(str: String): List[String] = {
+    val splitter = new QuotedStringTokenizer(str, " \t\n\r")
+    splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
+  }
+
+  def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = {
+    LOG.debug(s"$sender => `$command`")
+
+    val p = splitQuotedString(command).run()
+    val f = Future(blocking(p.exitValue())) // wrap in Future
+    val retval = {
+      try {
+        Await.result(f, timeout)
+      } catch {
+        case _: TimeoutException =>
+          LOG.error(s"timeout to execute command `$command`")
+          p.destroy()
+          p.exitValue()
+      }
+    }
+    LOG.debug(s"$sender <= exit $retval")
+    retval == 0
+  }
+
+  def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT)
+    : String = {
+    LOG.debug(s"$sender => `$command`")
+
+    val buf = new StringBuilder
+    val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"),
+      (e: String) => buf.append(e).append("\n"))
+    val p = splitQuotedString(command).run(processLogger)
+    val f = Future(blocking(p.exitValue())) // wrap in Future
+    val retval = {
+      try {
+        Await.result(f, timeout)
+      } catch {
+        case _: TimeoutException =>
+          p.destroy()
+          p.exitValue()
+      }
+    }
+    val output = buf.toString().trim
+    val PREVIEW_MAX_LENGTH = 200
+    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+    } else {
+      output
+    }
+
+    LOG.debug(s"$sender <= `$preview` exit $retval")
+    if (retval != 0) {
+      throw new RuntimeException(
+        s"exited ($retval) by executing `$command`")
+    }
+    output
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
new file mode 100644
index 0000000..7e7085d
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.log4j.Logger
+
+object Util {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  def encodeUriComponent(s: String): String = {
+    try {
+      java.net.URLEncoder.encode(s, "UTF-8")
+        .replaceAll("\\+", "%20")
+        .replaceAll("\\%21", "!")
+        .replaceAll("\\%27", "'")
+        .replaceAll("\\%28", "(")
+        .replaceAll("\\%29", ")")
+        .replaceAll("\\%7E", "~")
+    } catch {
+      case ex: Throwable => s
+    }
+  }
+
+  def retryUntil(
+      condition: () => Boolean, conditionDescription: String, maxTries: Int = 15,
+      interval: Duration = 10.seconds): Unit = {
+    var met = false
+    var tries = 0
+
+    while (!met && tries < maxTries) {
+
+      met = Try(condition()) match {
+        case Success(true) => true
+        case Success(false) => false
+        case Failure(ex) => false
+      }
+
+      tries += 1
+
+      if (!met) {
+        LOG.error(s"Failed due to (false == $conditionDescription), " +
+          s"retrying for the ${tries} times...")
+        Thread.sleep(interval.toMillis)
+      } else {
+        LOG.info(s"Success ($conditionDescription) after ${tries} retries")
+      }
+    }
+
+    if (!met) {
+      throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
new file mode 100644
index 0000000..f836abd
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.hadoop
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object HadoopCluster {
+
+  /** Starts a Hadoop cluster */
+  def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
+    val hadoopCluster = new HadoopCluster
+    try {
+      hadoopCluster.start()
+      testCode(hadoopCluster)
+    } finally {
+      hadoopCluster.shutDown()
+    }
+  }
+}
+/**
+ * This class maintains a single node Hadoop cluster
+ */
+class HadoopCluster {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
+  private val HADOOP_HOST = "hadoop0"
+
+  def start(): Unit = {
+    Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
+
+    Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
+    LOG.info("Hadoop cluster is started.")
+  }
+
+  // Checks whether the cluster is alive by listing "/"
+  private def isAlive: Boolean = {
+    Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /")
+  }
+
+  def getDefaultFS: String = {
+    val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
+    s"hdfs://$hostIPAddr:9000"
+  }
+
+  def shutDown(): Unit = {
+    Docker.killAndRemoveContainer(HADOOP_HOST)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
new file mode 100644
index 0000000..15ba084
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object KafkaCluster {
+
+  /** Starts a Kafka cluster */
+  def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
+    val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
+    try {
+      kafkaCluster.start()
+      testCode(kafkaCluster)
+    } finally {
+      kafkaCluster.shutDown()
+    }
+  }
+
+  def withDataProducer(topic: String, brokerList: String)
+    (testCode: NumericalDataProducer => Unit): Unit = {
+    val producer = new NumericalDataProducer(topic, brokerList)
+    try {
+      producer.start()
+      testCode(producer)
+    } finally {
+      producer.stop()
+    }
+  }
+}
+
+/**
+ * This class maintains a single node Kafka cluster with integrated Zookeeper.
+ */
+class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
+  private val KAFKA_HOST = "kafka0"
+  private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
+  private val ZOOKEEPER_PORT = 2181
+  private val BROKER_PORT = 9092
+  val advertisedPort = BROKER_PORT
+
+  def start(): Unit = {
+    Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
+      environ = Map(
+        "ADVERTISED_HOST" -> advertisedHost,
+        "ADVERTISED_PORT" -> BROKER_PORT.toString,
+        "ZK_CHROOT" -> zkChroot),
+      tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
+    )
+    Util.retryUntil(() => isAlive, "kafka cluster is alive")
+    LOG.debug("kafka cluster is started.")
+  }
+
+  def isAlive: Boolean = {
+    !listTopics().contains("Connection refused")
+  }
+
+  def shutDown(): Unit = {
+    Docker.killAndRemoveContainer(KAFKA_HOST)
+  }
+
+  private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
+
+  def listTopics(): String = {
+    kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
+  }
+
+  def getZookeeperConnectString: String = {
+    s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
+  }
+
+  def getBrokerListConnectString: String = {
+    s"$hostIPAddr:$BROKER_PORT"
+  }
+
+  def createTopic(topic: String, partitions: Int = 1): Unit = {
+    LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
+
+    Docker.executeSilently(KAFKA_HOST,
+      s"$KAFKA_HOME/bin/kafka-topics.sh" +
+        s" --zookeeper $getZookeeperConnectString" +
+        s" --create --topic $topic --partitions $partitions --replication-factor 1")
+  }
+
+  def produceDataToKafka(topic: String, messageNum: Int): Unit = {
+    Docker.executeSilently(KAFKA_HOST,
+      s"$KAFKA_HOME/bin/kafka-topics.sh" +
+        s" --zookeeper $getZookeeperConnectString" +
+        s" --create --topic $topic --partitions 1 --replication-factor 1")
+
+    Docker.executeSilently(KAFKA_HOST,
+      s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
+        s" --broker-list $getBrokerListConnectString" +
+        s" --topic $topic --messages $messageNum")
+  }
+
+  def getLatestOffset(topic: String): Int = {
+    kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
+  }
+
+  private def kafkaListTopics(
+      container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
+
+    LOG.debug(s"|=> Kafka list topics...")
+    Docker.execute(container,
+      s"$kafkaHome/bin/kafka-topics.sh" +
+        s" --zookeeper $zookeeperConnectionString -list")
+  }
+
+  private def kafkaFetchLatestOffset(
+      container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
+    LOG.debug(s"|=> Get latest offset of topic $topic...")
+    val output = Docker.execute(container,
+      s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
+        s" --broker-list $brokersList " +
+        s" --topic $topic --time -1")
+    output.split(":")(2).toInt
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
new file mode 100644
index 0000000..1cf3125
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class NumericalDataProducer(topic: String, bootstrapServers: String) {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val producer = createProducer
+  private val WRITE_SLEEP_NANOS = 10
+  private val serializer = new ChillSerializer[Int]
+  var lastWriteNum = 0
+
+  def start(): Unit = {
+    produceThread.start()
+  }
+
+  def stop(): Unit = {
+    if (produceThread.isAlive) {
+      produceThread.interrupt()
+      produceThread.join()
+    }
+    producer.close()
+  }
+
+  /** How many message we have written in total */
+  def producedNumbers: Range = {
+    Range(1, lastWriteNum + 1)
+  }
+
+  private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", bootstrapServers)
+    new KafkaProducer[Array[Byte], Array[Byte]](properties,
+      new ByteArraySerializer, new ByteArraySerializer)
+  }
+
+  private val produceThread = new Thread(new Runnable {
+    override def run(): Unit = {
+      try {
+        while (!Thread.currentThread.isInterrupted) {
+          lastWriteNum += 1
+          val msg = serializer.serialize(lastWriteNum)
+          val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
+          producer.send(record)
+          Thread.sleep(0, WRITE_SLEEP_NANOS)
+        }
+      } catch {
+        case ex: InterruptedException =>
+          LOG.error("message producing is stopped by an interrupt")
+      }
+    }
+  })
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
new file mode 100644
index 0000000..1f773d3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.collection.mutable
+
+trait ResultVerifier {
+  def onNext(num: Int): Unit
+}
+
+class MessageLossDetector(totalNum: Int) extends ResultVerifier {
+  private val bitSets = new mutable.BitSet(totalNum)
+  var result = List.empty[Int]
+
+  override def onNext(num: Int): Unit = {
+    bitSets.add(num)
+    result :+= num
+  }
+
+  def allReceived: Boolean = {
+    1.to(totalNum).forall(bitSets)
+  }
+
+  def received(num: Int): Boolean = {
+    bitSets(num)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
new file mode 100644
index 0000000..392ca86
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.util.{Failure, Success}
+
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
+import kafka.utils.Utils
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0,
+    host: String, port: Int) {
+
+  private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
+  private val serializer = new ChillSerializer[Int]
+  private var offset = 0L
+
+  def read(): Unit = {
+    val messageSet = consumer.fetch(
+      new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build()
+    ).messageSet(topic, partition)
+
+    for (messageAndOffset <- messageSet) {
+      serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match {
+        case Success(msg) =>
+          offset = messageAndOffset.nextOffset
+          verifier.onNext(msg)
+        case Failure(e) => throw e
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
new file mode 100644
index 0000000..73413da
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.sys.process._
+
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A helper to instantiate the base image for different usage.
+ */
+class BaseContainer(val host: String, command: String,
+    masterAddrs: List[(String, Int)],
+    tunnelPorts: Set[Int] = Set.empty) {
+
+  private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
+  private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
+  private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
+  private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
+  private val HOST_LOG_HOME = {
+    val dir = "/tmp/gearpump"
+    s"mkdir -p $dir".!!
+    s"mktemp -p $dir -d".!!.trim
+  }
+
+  private val CLUSTER_OPTS = {
+    masterAddrs.zipWithIndex.map { case (hostPort, index) =>
+      s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
+    }.mkString(" ")
+  }
+
+  def createAndStart(): String = {
+    Docker.createAndStartContainer(host, IMAGE_NAME, command,
+      environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
+      volumes = Map(
+        HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
+        HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
+      knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
+      tunnelPorts = tunnelPorts)
+  }
+
+  def killAndRemove(): Unit = {
+    Docker.killAndRemoveContainer(host)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
new file mode 100644
index 0000000..884a8d1
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A command-line client to operate a Gearpump cluster
+ */
+class CommandLineClient(host: String) {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  def listApps(): Array[String] = {
+    gearCommand(host, "gear info").split("\n").filter(
+      _.startsWith("application: ")
+    )
+  }
+
+  def listRunningApps(): Array[String] =
+    listApps().filter(
+      _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
+    )
+
+  def queryApp(appId: Int): String = try {
+    listApps().filter(
+      _.startsWith(s"application: $appId")
+    ).head
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      ""
+  }
+
+  def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
+    gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
+  }
+
+  def submitApp(jar: String, args: String = ""): Int = {
+    LOG.debug(s"|=> Submit Application $jar...")
+    submitAppUse("gear app", jar, args)
+  }
+
+  private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
+    gearCommand(host, s"$launcher -jar $jar $args").split("\n")
+      .filter(_.contains("The application id is ")).head.split(" ").last.toInt
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      -1
+  }
+
+  def killApp(appId: Int): Boolean = {
+    tryGearCommand(host, s"gear kill -appid $appId")
+  }
+
+  private def gearCommand(container: String, command: String): String = {
+    LOG.debug(s"|=> Gear command $command in container $container...")
+    Docker.execute(container, s"/opt/start $command")
+  }
+
+  private def tryGearCommand(container: String, command: String): Boolean = {
+    LOG.debug(s"|=> Gear command $command in container $container...")
+    Docker.executeSilently(container, s"/opt/start $command")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
new file mode 100644
index 0000000..4d439e8
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import java.io.IOException
+import scala.collection.mutable.ListBuffer
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+/**
+ * This class is a test driver for end-to-end integration test.
+ */
+class MiniCluster {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val SUT_HOME = "/opt/gearpump"
+
+  private val REST_SERVICE_PORT = 8090
+  private val MASTER_PORT = 3000
+  private val MASTER_ADDRS: List[(String, Int)] = {
+    (0 to 0).map(index =>
+      ("master" + index, MASTER_PORT)
+    ).toList
+  }
+
+  lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
+
+  lazy val restClient: RestClient = {
+    val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
+    client
+  }
+
+  private var workers: ListBuffer[String] = ListBuffer.empty
+
+  def start(workerNum: Int = 2): Unit = {
+
+    // Kill master
+    MASTER_ADDRS.foreach { case (host, _) =>
+      if (Docker.containerExists(host)) {
+        Docker.killAndRemoveContainer(host)
+      }
+    }
+
+    // Kill existing workers
+    workers ++= (0 until workerNum).map("worker" + _)
+    workers.foreach { worker =>
+      if (Docker.containerExists(worker)) {
+        Docker.killAndRemoveContainer(worker)
+      }
+    }
+
+    // Start Masters
+    MASTER_ADDRS.foreach({ case (host, port) =>
+      addMasterNode(host, port)
+    })
+
+    // Start Workers
+    workers.foreach { worker =>
+      val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
+      container.createAndStart()
+    }
+
+    // Check cluster status
+    expectRestClientAuthenticated()
+    expectClusterAvailable()
+  }
+
+  private def addMasterNode(host: String, port: Int): Unit = {
+    val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
+    container.createAndStart()
+  }
+
+  def addWorkerNode(host: String): Unit = {
+    if (workers.find(_ == host).isEmpty) {
+      val container = new BaseContainer(host, "worker", MASTER_ADDRS)
+      container.createAndStart()
+      workers += host
+    } else {
+      throw new IOException(s"Cannot add new worker $host, " +
+        s"as worker with same hostname already exists")
+    }
+  }
+
+  /**
+   * @throws RuntimeException if rest client is not authenticated after N attempts
+   */
+  private def expectRestClientAuthenticated(): Unit = {
+    Util.retryUntil(() => {
+      restClient.login()
+      LOG.info("rest client has been authenticated")
+      true
+    }, "login successfully")
+  }
+
+  /**
+   * @throws RuntimeException if service is not available after N attempts
+   */
+  private def expectClusterAvailable(): Unit = {
+    Util.retryUntil(() => {
+      val response = restClient.queryMaster()
+      LOG.info(s"cluster is now available with response: $response.")
+      response.aliveFor > 0
+    }, "cluster running")
+  }
+
+  def isAlive: Boolean = {
+    getMasterHosts.exists(nodeIsOnline)
+  }
+
+  def getNetworkGateway: String = {
+    Docker.getNetworkGateway(MASTER_ADDRS.head._1)
+  }
+
+  def shutDown(): Unit = {
+    val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
+      .filter(nodeIsOnline).toArray
+    if (removalHosts.length > 0) {
+      Docker.killAndRemoveContainer(removalHosts)
+    }
+    workers.clear()
+  }
+
+  def removeMasterNode(host: String): Unit = {
+    Docker.killAndRemoveContainer(host)
+  }
+
+  def removeWorkerNode(host: String): Unit = {
+    workers -= host
+    Docker.killAndRemoveContainer(host)
+  }
+
+  def restart(): Unit = {
+    shutDown()
+    Util.retryUntil(() => {
+      !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
+    }, "all docker containers killed")
+    LOG.info("all docker containers have been killed. restarting...")
+    start()
+  }
+
+  def getMastersAddresses: List[(String, Int)] = {
+    MASTER_ADDRS
+  }
+
+  def getMasterHosts: List[String] = {
+    MASTER_ADDRS.map({ case (host, port) => host })
+  }
+
+  def getWorkerHosts: List[String] = {
+    workers.toList
+  }
+
+  def nodeIsOnline(host: String): Boolean = {
+    Docker.containerIsRunning(host)
+  }
+
+  private def builtInJarsUnder(folder: String): Array[String] = {
+    Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
+  }
+
+  private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
+    builtInJarsUnder(folder).filter(_.contains(subtext))
+  }
+
+  def queryBuiltInExampleJars(subtext: String): Seq[String] = {
+    queryBuiltInJars("examples", subtext)
+  }
+
+  def queryBuiltInITJars(subtext: String): Seq[String] = {
+    queryBuiltInJars("integrationtest", subtext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
new file mode 100644
index 0000000..1b143af
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.reflect.ClassTag
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.log4j.Logger
+import upickle.Js
+import upickle.default._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
+import org.apache.gearpump.cluster.master.MasterSummary
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+import org.apache.gearpump.services.AppMasterService.Status
+import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
+import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
+import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
+import org.apache.gearpump.util.{Constants, Graph}
+
+/**
+ * A REST client to operate a Gearpump cluster
+ */
+class RestClient(host: String, port: Int) {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  private val cookieFile: String = "cookie.txt"
+
+  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
+    upickle.default.Reader[Graph[Int, String]] {
+    case Js.Obj(verties, edges) =>
+      val vertexList = upickle.default.readJs[List[Int]](verties._2)
+      val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
+      Graph(vertexList, edgeList)
+  }
+
+  private def decodeAs[T](
+      expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
+    read[T](expr)
+  } catch {
+    case ex: Throwable =>
+      LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
+      throw ex
+  }
+
+  def queryVersion(): String = {
+    curl("version")
+  }
+
+  def listWorkers(): Array[WorkerSummary] = {
+    val resp = callApi("master/workerlist")
+    decodeAs[List[WorkerSummary]](resp).toArray
+  }
+
+  def listRunningWorkers(): Array[WorkerSummary] = {
+    listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
+  }
+
+  def listApps(): Array[AppMasterData] = {
+    val resp = callApi("master/applist")
+    decodeAs[AppMastersData](resp).appMasters.toArray
+  }
+
+  def listRunningApps(): Array[AppMasterData] = {
+    listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
+  }
+
+  def getNextAvailableAppId(): Int = {
+    listApps().length + 1
+  }
+
+  def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
+    : Boolean = try {
+    var endpoint = "master/submitapp"
+
+    var options = Seq(s"jar=@$jar")
+    if (config.length > 0) {
+      options :+= s"conf=@$config"
+    }
+
+    options :+= s"executorcount=$executorNum"
+
+    if (args != null && !args.isEmpty) {
+      options :+= "args=\"" + args + "\""
+    }
+
+    val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
+    val result = decodeAs[AppSubmissionResult](resp)
+    assert(result.success)
+    true
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def queryApp(appId: Int): AppMasterData = {
+    val resp = callApi(s"appmaster/$appId")
+    decodeAs[AppMasterData](resp)
+  }
+
+  def queryAppMasterConfig(appId: Int): Config = {
+    val resp = callApi(s"appmaster/$appId/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
+    val resp = callApi(s"appmaster/$appId?detail=true")
+    decodeAs[StreamAppMasterSummary](resp)
+  }
+
+  def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
+    : HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
+    val resp = callApi(s"appmaster/$appId/executor/$executorId")
+    decodeAs[ExecutorSummary](resp)
+  }
+
+  def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
+    queryStreamingAppDetail(appId).executors.toArray
+  }
+
+  def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryExecutorConfig(appId: Int, executorId: Int): Config = {
+    val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryMaster(): MasterSummary = {
+    val resp = callApi("master")
+    decodeAs[MasterData](resp).masterDescription
+  }
+
+  def queryMasterMetrics(current: Boolean): HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val resp = callApi(s"master/metrics/master?$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryMasterConfig(): Config = {
+    val resp = callApi("master/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val workerIdStr = WorkerId.render(workerId)
+    val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryWorkerConfig(workerId: WorkerId): Config = {
+    val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryBuiltInPartitioners(): Array[String] = {
+    val resp = callApi("master/partitioners")
+    decodeAs[BuiltinPartitioners](resp).partitioners
+  }
+
+  def uploadJar(localFilePath: String): AppJar = {
+    val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
+    decodeAs[AppJar](resp)
+  }
+
+  def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
+    replaceStreamingAppProcessor(appId, replaceMe, false)
+  }
+
+  def replaceStreamingAppProcessor(
+      appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
+    val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
+    val args = upickle.default.write(replaceOperation)
+    val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
+      CRUD_POST)
+    decodeAs[DAGOperationResult](resp)
+    true
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def killAppMaster(appId: Int): Boolean = {
+    killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+  }
+
+  def killExecutor(appId: Int, executorId: Int): Boolean = try {
+    val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
+    val pid = jvmInfo(0).toInt
+    val hostname = jvmInfo(1)
+    Docker.killProcess(hostname, pid)
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def killApp(appId: Int): Boolean = try {
+    val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
+    resp.contains("\"status\":\"success\"")
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def restartApp(appId: Int): Boolean = try {
+    val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
+    decodeAs[Status](resp).success
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  private val CRUD_POST = "-X POST"
+  private val CRUD_DELETE = "-X DELETE"
+
+  private def callApi(endpoint: String, option: String = ""): String = {
+    curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
+  }
+
+  private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
+    Docker.curl(host, s"http://$host:$port/$endpoint", options)
+  }
+
+  def login(): Unit = {
+    curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
+      "--data username=admin", "--data password=admin"))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
new file mode 100644
index 0000000..79adfc4
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import scala.util.Random
+
+import backtype.storm.utils.{DRPCClient, Utils}
+
+import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+class StormClient(cluster: MiniCluster, restClient: RestClient) {
+
+  private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
+  private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
+  private val DRPC_HOST = "storm0"
+  private val DRPC_PORT = 3772
+  private val DRPC_INVOCATIONS_PORT = 3773
+  private val STORM_DRPC = "storm-drpc"
+  private val NIMBUS_HOST = "storm1"
+  private val STORM_NIMBUS = "storm nimbus"
+  private val STORM_APP = "/opt/start storm app"
+
+  private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
+    tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+
+  private val nimbusContainer =
+    new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
+
+  def start(): Unit = {
+    nimbusContainer.createAndStart()
+    ensureNimbusRunning()
+
+    drpcContainer.createAndStart()
+    ensureDrpcServerRunning()
+  }
+
+  private def ensureNimbusRunning(): Unit = {
+    Util.retryUntil(() => {
+      val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
+      // Parse format nimbus.thrift.port: '39322'
+      val thriftPort = response.split(" ")(1).replace("'", "").toInt
+
+      Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
+    }, "Nimbus running")
+  }
+
+  private def ensureDrpcServerRunning(): Unit = {
+    Util.retryUntil(() => {
+      Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
+    }, "DRPC running")
+  }
+
+  def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
+    Util.retryUntil(() => {
+      Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+        s"-jar $jar $mainClass $args")
+      restClient.listRunningApps().exists(_.appName == appName)
+    }, "app running")
+    restClient.listRunningApps().filter(_.appName == appName).head.appId
+  }
+
+  def getDRPCClient(drpcServerIp: String): DRPCClient = {
+    val config = Utils.readDefaultConfig()
+    new DRPCClient(config, drpcServerIp, DRPC_PORT)
+  }
+
+  def shutDown(): Unit = {
+
+    // Cleans up the storm.yaml config file
+    Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
+    drpcContainer.killAndRemove()
+    nimbusContainer.killAndRemove()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
-  private var id = 0L
-
-  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
-    val bytes = tuple.getBinary(0)
-    collector.emit(new Values(s"$id".getBytes, bytes))
-    id += 1
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
-      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
-  }
-}