You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/27 11:38:48 UTC

[1/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the integration test file structure

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c80c06963 -> e9ea6e2f3


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
deleted file mode 100644
index a0d9a42..0000000
--- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
-
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.{Config, StormSubmitter}
-import storm.kafka.bolt.KafkaBolt
-import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-
-/**
- * Tests Storm 0.10.x compatibility over Gearpump
- * this example reads data from Kafka and writes back to it
- */
-object Storm010KafkaTopology extends App with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
-    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
-    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
-    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
-      required = true),
-    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
-    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
-      defaultValue = Some(1)),
-    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
-  )
-
-  val configs = parse(args)
-  val topologyName = configs.getString("topologyName")
-  val sourceTopic = configs.getString("sourceTopic")
-  val sinkTopic = configs.getString("sinkTopic")
-  val zookeeperConnect = configs.getString("zookeeperConnect")
-  val brokerList = configs.getString("brokerList")
-  val spoutNum = configs.getInt("spoutNum")
-  val boltNum = configs.getInt("boltNum")
-
-  val topologyBuilder = new TopologyBuilder()
-  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic,
-    zookeeperConnect, topologyName))
-  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
-  val adaptor = new Adaptor
-  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
-  topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
-  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
-  val config = new Config()
-  config.putAll(getBoltConfig(sinkTopic, brokerList))
-  config.put(Config.TOPOLOGY_NAME, topologyName)
-  val topology = topologyBuilder.createTopology()
-  StormSubmitter.submitTopology(topologyName, config, topology)
-
-  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
-    val hosts = new ZkHosts(zookeeperConnect)
-    val index = zookeeperConnect.indexOf("/")
-    val zookeeper = zookeeperConnect.take(index)
-    val kafkaRoot = zookeeperConnect.drop(index)
-    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
-
-    val serverAndPort = zookeeper.split(":")
-    config.zkServers = new JArrayList[String]
-    config.zkServers.add(serverAndPort(0))
-    config.zkPort = serverAndPort(1).toInt
-    config
-  }
-
-  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
-    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    brokerConfig.put("metadata.broker.list", brokerList)
-    brokerConfig.put("request.required.acks", "1")
-    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
-    kafkaConfig.put(KafkaBolt.TOPIC, topic)
-    kafkaConfig
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
new file mode 100644
index 0000000..67a2491
--- /dev/null
+++ b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
+
+class Adaptor extends BaseBasicBolt {
+  private var id = 0L
+
+  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
+    val bytes = tuple.getBinary(0)
+    collector.emit(new Values(s"$id".getBytes, bytes))
+    id += 1
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
+      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
new file mode 100644
index 0000000..a0d9a42
--- /dev/null
+++ b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.{Config, StormSubmitter}
+import storm.kafka.bolt.KafkaBolt
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+
+/**
+ * Tests Storm 0.10.x compatibility over Gearpump
+ * this example reads data from Kafka and writes back to it
+ */
+object Storm010KafkaTopology extends App with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
+    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
+    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
+    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
+      required = true),
+    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
+    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
+      defaultValue = Some(1)),
+    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
+  )
+
+  val configs = parse(args)
+  val topologyName = configs.getString("topologyName")
+  val sourceTopic = configs.getString("sourceTopic")
+  val sinkTopic = configs.getString("sinkTopic")
+  val zookeeperConnect = configs.getString("zookeeperConnect")
+  val brokerList = configs.getString("brokerList")
+  val spoutNum = configs.getInt("spoutNum")
+  val boltNum = configs.getInt("boltNum")
+
+  val topologyBuilder = new TopologyBuilder()
+  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic,
+    zookeeperConnect, topologyName))
+  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
+  val adaptor = new Adaptor
+  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
+  topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
+  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
+  val config = new Config()
+  config.putAll(getBoltConfig(sinkTopic, brokerList))
+  config.put(Config.TOPOLOGY_NAME, topologyName)
+  val topology = topologyBuilder.createTopology()
+  StormSubmitter.submitTopology(topologyName, config, topology)
+
+  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
+    val hosts = new ZkHosts(zookeeperConnect)
+    val index = zookeeperConnect.indexOf("/")
+    val zookeeper = zookeeperConnect.take(index)
+    val kafkaRoot = zookeeperConnect.drop(index)
+    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
+
+    val serverAndPort = zookeeper.split(":")
+    config.zkServers = new JArrayList[String]
+    config.zkServers.add(serverAndPort(0))
+    config.zkPort = serverAndPort(1).toInt
+    config
+  }
+
+  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
+    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    brokerConfig.put("metadata.broker.list", brokerList)
+    brokerConfig.put("request.required.acks", "1")
+    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
+    kafkaConfig.put(KafkaBolt.TOPIC, topic)
+    kafkaConfig
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
-  private var id = 0L
-
-  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
-    val bytes = tuple.getBinary(0)
-    collector.emit(new Values(s"$id".getBytes, bytes))
-    id += 1
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
-      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
deleted file mode 100644
index 5b74d60..0000000
--- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
-
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.{Config, StormSubmitter}
-import storm.kafka.bolt.KafkaBolt
-import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-
-/**
- * Tests Storm 0.9.x compatibility over Gearpump
- * this example reads data from Kafka and writes back to it
- */
-object Storm09KafkaTopology extends App with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
-    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
-    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
-    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
-      required = true),
-    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
-    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
-      defaultValue = Some(1)),
-    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
-  )
-
-  val configs = parse(args)
-  val topologyName = configs.getString("topologyName")
-  val sourceTopic = configs.getString("sourceTopic")
-  val sinkTopic = configs.getString("sinkTopic")
-  val zookeeperConnect = configs.getString("zookeeperConnect")
-  val brokerList = configs.getString("brokerList")
-  val spoutNum = configs.getInt("spoutNum")
-  val boltNum = configs.getInt("boltNum")
-
-  val topologyBuilder = new TopologyBuilder()
-  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, zookeeperConnect,
-    topologyName))
-  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
-  val adaptor = new Adaptor
-  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
-  topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
-  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
-  val config = new Config()
-  config.putAll(getBoltConfig(sinkTopic, brokerList))
-  config.put(Config.TOPOLOGY_NAME, topologyName)
-  val topology = topologyBuilder.createTopology()
-  StormSubmitter.submitTopology(topologyName, config, topology)
-
-  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
-    val hosts = new ZkHosts(zookeeperConnect)
-    val index = zookeeperConnect.indexOf("/")
-    val zookeeper = zookeeperConnect.take(index)
-    val kafkaRoot = zookeeperConnect.drop(index)
-    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
-    config.forceFromStart = true
-
-    val serverAndPort = zookeeper.split(":")
-    config.zkServers = new JArrayList[String]
-    config.zkServers.add(serverAndPort(0))
-    config.zkPort = serverAndPort(1).toInt
-    config
-  }
-
-  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
-    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
-    brokerConfig.put("metadata.broker.list", brokerList)
-    brokerConfig.put("request.required.acks", "1")
-    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
-    kafkaConfig.put(KafkaBolt.TOPIC, topic)
-    kafkaConfig
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
new file mode 100644
index 0000000..67a2491
--- /dev/null
+++ b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
+
+class Adaptor extends BaseBasicBolt {
+  private var id = 0L
+
+  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
+    val bytes = tuple.getBinary(0)
+    collector.emit(new Values(s"$id".getBytes, bytes))
+    id += 1
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
+      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
new file mode 100644
index 0000000..5b74d60
--- /dev/null
+++ b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.{Config, StormSubmitter}
+import storm.kafka.bolt.KafkaBolt
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+
+/**
+ * Tests Storm 0.9.x compatibility over Gearpump
+ * this example reads data from Kafka and writes back to it
+ */
+object Storm09KafkaTopology extends App with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
+    "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
+    "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
+    "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
+      required = true),
+    "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
+    "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
+      defaultValue = Some(1)),
+    "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
+  )
+
+  val configs = parse(args)
+  val topologyName = configs.getString("topologyName")
+  val sourceTopic = configs.getString("sourceTopic")
+  val sinkTopic = configs.getString("sinkTopic")
+  val zookeeperConnect = configs.getString("zookeeperConnect")
+  val brokerList = configs.getString("brokerList")
+  val spoutNum = configs.getInt("spoutNum")
+  val boltNum = configs.getInt("boltNum")
+
+  val topologyBuilder = new TopologyBuilder()
+  val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, zookeeperConnect,
+    topologyName))
+  val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
+  val adaptor = new Adaptor
+  topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
+  topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
+  topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
+  val config = new Config()
+  config.putAll(getBoltConfig(sinkTopic, brokerList))
+  config.put(Config.TOPOLOGY_NAME, topologyName)
+  val topology = topologyBuilder.createTopology()
+  StormSubmitter.submitTopology(topologyName, config, topology)
+
+  def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
+    val hosts = new ZkHosts(zookeeperConnect)
+    val index = zookeeperConnect.indexOf("/")
+    val zookeeper = zookeeperConnect.take(index)
+    val kafkaRoot = zookeeperConnect.drop(index)
+    val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
+    config.forceFromStart = true
+
+    val serverAndPort = zookeeper.split(":")
+    config.zkServers = new JArrayList[String]
+    config.zkServers.add(serverAndPort(0))
+    config.zkPort = serverAndPort(1).toInt
+    config
+  }
+
+  def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
+    val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+    brokerConfig.put("metadata.broker.list", brokerList)
+    brokerConfig.put("request.required.acks", "1")
+    kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
+    kafkaConfig.put(KafkaBolt.TOPIC, topic)
+    kafkaConfig
+  }
+}
+


[3/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the integration test file structure

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
new file mode 100644
index 0000000..d8bdc1e
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.scalatest.TestData
+
+import org.apache.gearpump.integrationtest.kafka._
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks the Kafka datasource connector
+ */
+class ConnectorKafkaSpec extends TestSpecBase {
+
+  private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
+  private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head
+  private var producer: NumericalDataProducer = null
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    kafkaCluster.start()
+  }
+
+  override def afterAll(): Unit = {
+    kafkaCluster.shutDown()
+    super.afterAll()
+  }
+
+  override def afterEach(test: TestData): Unit = {
+    super.afterEach(test)
+    if (producer != null) {
+      producer.stop()
+      producer = null
+    }
+  }
+
+  "KafkaSource and KafkaSink" should {
+    "read from and write to kafka" in {
+      // setup
+      val sourceTopic = "topic1"
+      val sinkTopic = "topic2"
+      val messageNum = 10000
+      kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
+
+      // exercise
+      val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
+        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+        "-brokerList", kafkaCluster.getBrokerListConnectString,
+        "-sourceTopic", sourceTopic,
+        "-sinkTopic", sinkTopic).mkString(" ")
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
+      success shouldBe true
+
+      // verify
+      expectAppIsRunning(appId, "KafkaReadWrite")
+      Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum,
+        "kafka all message written")
+    }
+  }
+
+  "Gearpump with Kafka" should {
+    "support at-least-once message delivery" in {
+      // setup
+      val sourcePartitionNum = 2
+      val sourceTopic = "topic3"
+      val sinkTopic = "topic4"
+      // Generate number sequence (1, 2, 3, ...) to the topic
+      kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+      producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString)
+      producer.start()
+
+      // exercise
+      val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
+        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+        "-brokerList", kafkaCluster.getBrokerListConnectString,
+        "-sourceTopic", sourceTopic,
+        "-sinkTopic", sinkTopic,
+        "-source", sourcePartitionNum).mkString(" ")
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
+      success shouldBe true
+
+      // verify #1
+      expectAppIsRunning(appId, "KafkaReadWrite")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+
+      // verify #2
+      val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+      restClient.killExecutor(appId, executorToKill) shouldBe true
+      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+        .map(_.executorId).max > executorToKill,
+        s"executor $executorToKill killed")
+
+      // verify #3
+      val detector = new MessageLossDetector(producer.lastWriteNum)
+      val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+        host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
+      Util.retryUntil(() => {
+        kafkaReader.read()
+        detector.allReceived
+      }, "kafka all message read")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
new file mode 100644
index 0000000..56b33c1
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+import org.apache.gearpump.metrics.Metrics.Meter
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ProcessorSummary
+
+class DynamicDagSpec extends TestSpecBase {
+
+  lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
+  val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
+  val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
+  val solName = "sol"
+
+  "dynamic dag" should {
+    "can retrieve a list of built-in partitioner classes" in {
+      val partitioners = restClient.queryBuiltInPartitioners()
+      partitioners.length should be > 0
+      partitioners.foreach(clazz =>
+        clazz should startWith("org.apache.gearpump.partitioner.")
+      )
+    }
+
+    "can compose a wordcount application from scratch" in {
+      // todo: blocked by #1450
+    }
+
+    "can replace down stream with wordcount's sum processor (new processor will have metrics)" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 1, sumTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor successfully added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
+    }
+
+    "can replace up stream with wordcount's split processor (new processor will have metrics)" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 0, splitTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput")
+    }
+
+    "fall back to last dag version when replacing a processor failid" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 1, sumTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
+
+      val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake"
+      replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
+      Util.retryUntil(() => {
+        val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors
+        processorsAfterFailure.size == laterProcessors.size
+      }, "new processor added")
+      val currentClock = restClient.queryStreamingAppDetail(appId).clock
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock,
+        "app clock is advancing")
+    }
+
+    "fall back to last dag version when AppMaster HA triggered" in {
+      // setup
+      val appId = expectSolJarSubmittedWithAppId()
+
+      // exercise
+      val formerAppMaster = restClient.queryApp(appId).appMasterPath
+      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+      replaceProcessor(appId, 1, sumTaskClass)
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new processor added")
+      processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
+
+      restClient.killAppMaster(appId) shouldBe true
+      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
+        "new AppMaster created")
+      val processors = restClient.queryStreamingAppDetail(appId).processors
+      processors.size shouldEqual laterProcessors.size
+    }
+  }
+
+  private def expectSolJarSubmittedWithAppId(): Int = {
+    val appId = restClient.getNextAvailableAppId()
+    val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
+    success shouldBe true
+    expectAppIsRunning(appId, solName)
+    Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+    appId
+  }
+
+  private def replaceProcessor(
+      appId: Int,
+      formerProcessorId: Int,
+      newTaskClass: String,
+      newProcessorDescription: String = "",
+      newParallelism: Int = 1): Unit = {
+    val uploadedJar = restClient.uploadJar(wordCountJar)
+    val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
+      newParallelism, newProcessorDescription,
+      jar = uploadedJar)
+
+    // exercise
+    val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
+    success shouldBe true
+  }
+
+  private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = {
+    Util.retryUntil(() => {
+      val actual = restClient.queryStreamingAppMetrics(appId, current = false,
+        path = "processor" + processorId)
+      val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
+      throughput.size should be > 0
+      throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
+    }, "new processor has message received")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
new file mode 100644
index 0000000..27e4665
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ProcessorSummary
+
+/**
+ * The test spec will perform destructive operations to check the stability
+ */
+class ExampleSpec extends TestSpecBase {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  "distributed shell" should {
+    "execute commands on machines where its executors are running" in {
+      val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head
+      val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell"
+      val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient"
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
+      success shouldBe true
+      expectAppIsRunning(appId, "DistributedShell")
+      val args = Array(
+        clientClass,
+        "-appid", appId.toString,
+        "-command", "hostname"
+      )
+
+      val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
+
+      def verify(): Boolean = {
+        val workerNum = cluster.getWorkerHosts.length
+        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
+          workerNum, args.mkString(" ")).split("\n").
+          filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
+        expectedHostNames.forall(result.contains)
+      }
+
+      Util.retryUntil(() => verify(),
+        s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}")
+    }
+  }
+
+  "wordcount" should {
+    val wordCountJarNamePrefix = "wordcount-"
+    behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
+
+    "can submit immediately after killing a former one" in {
+      // setup
+      val formerAppId = restClient.getNextAvailableAppId()
+      val formerSubmissionSuccess =
+        restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      formerSubmissionSuccess shouldBe true
+      expectAppIsRunning(formerAppId, wordCountName)
+      Util.retryUntil(() =>
+        restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running")
+      restClient.killApp(formerAppId)
+
+      // exercise
+      val appId = formerAppId + 1
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+    }
+  }
+
+  "wordcount(java)" should {
+    val wordCountJavaJarNamePrefix = "wordcountjava-"
+    val wordCountJavaName = "wordcountJava"
+    behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName)
+  }
+
+  "sol" should {
+    val solJarNamePrefix = "sol-"
+    val solName = "sol"
+    behave like streamingApplication(solJarNamePrefix, solName)
+  }
+
+  "complexdag" should {
+    val dynamicDagJarNamePrefix = "complexdag-"
+    val dynamicDagName = "dag"
+    behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
+  }
+
+  def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
+    lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
+
+    "can obtain application clock and the clock will keep changing" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, appName)
+
+      // exercise
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted")
+      val formerClock = restClient.queryStreamingAppDetail(appId).clock
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock,
+        "app clock is advancing")
+    }
+
+    "can change the parallelism and description of a processor" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
+      formerSubmissionSuccess shouldBe true
+      expectAppIsRunning(appId, appName)
+      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+      val processor0 = formerProcessors.get(0).get
+      val expectedProcessorId = formerProcessors.size
+      val expectedParallelism = processor0.parallelism + 1
+      val expectedDescription = processor0.description + "new"
+      val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass,
+        expectedParallelism, description = expectedDescription)
+
+      // exercise
+      val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
+      success shouldBe true
+      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+      Util.retryUntil(() => {
+        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+        laterProcessors.size == formerProcessors.size + 1
+      }, "new process added")
+      val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
+      laterProcessor0.parallelism shouldEqual expectedParallelism
+      laterProcessor0.description shouldEqual expectedDescription
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
new file mode 100644
index 0000000..bb9982a
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._
+import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
+import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * Checks message delivery consistency, like at-least-once, and exactly-once.
+ */
+class MessageDeliverySpec extends TestSpecBase {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+  }
+
+  "Gearpump" should {
+    "support exactly-once message delivery" in {
+      withKafkaCluster(cluster) { kafkaCluster =>
+        // setup
+        val sourcePartitionNum = 1
+        val sourceTopic = "topic1"
+        val sinkTopic = "topic2"
+
+        // Generate number sequence (1, 2, 3, ...) to the topic
+        kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+        withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer =>
+
+          withHadoopCluster { hadoopCluster =>
+            // exercise
+            val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp",
+              "-defaultFS", hadoopCluster.getDefaultFS,
+              "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+              "-brokerList", kafkaCluster.getBrokerListConnectString,
+              "-sourceTopic", sourceTopic,
+              "-sinkTopic", sinkTopic,
+              "-sourceTask", sourcePartitionNum).mkString(" ")
+            val appId = restClient.getNextAvailableAppId()
+
+            val stateJar = cluster.queryBuiltInExampleJars("state-").head
+            val success = restClient.submitApp(stateJar, executorNum = 1, args = args)
+            success shouldBe true
+
+            // verify #1
+            expectAppIsRunning(appId, "MessageCount")
+            Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0,
+              "app is running")
+
+            // wait for checkpoint to take place
+            Thread.sleep(1000)
+
+            LOG.info("Trigger message replay by kill and restart the executors")
+            val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+            restClient.killExecutor(appId, executorToKill) shouldBe true
+            Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+              .map(_.executorId).max > executorToKill, s"executor $executorToKill killed")
+
+            producer.stop()
+            val producedNumbers = producer.producedNumbers
+            LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
+              s", ${producedNumbers.end - 1}] have been written to Kafka")
+
+            // verify #3
+            val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
+
+            assert(producedNumbers.size == kafkaSourceOffset,
+              "produced message should match Kafka queue size")
+
+            LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset)
+
+            // The sink processor of this job (MessageCountApp) writes total message
+            // count to Kafka Sink periodically (once every checkpoint interval).
+            // The detector keep record of latest message count.
+            val detector = new ResultVerifier {
+              var latestMessageCount: Int = 0
+              override def onNext(messageCount: Int): Unit = {
+                this.latestMessageCount = messageCount
+              }
+            }
+
+            val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+              host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
+            Util.retryUntil(() => {
+              kafkaReader.read()
+              LOG.info(s"Received message count: ${detector.latestMessageCount}, " +
+                s"expect: ${producedNumbers.size}")
+              detector.latestMessageCount == producedNumbers.size
+            }, "MessageCountApp calculated message count matches " +
+              "expected in case of message replay")
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
new file mode 100644
index 0000000..2f5bb64
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import scala.concurrent.duration._
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.master.MasterStatus
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks REST service usage
+ */
+class RestServiceSpec extends TestSpecBase {
+
+  "query system version" should {
+    "retrieve the current version number" in {
+      restClient.queryVersion() should not be empty
+    }
+  }
+
+  "list applications" should {
+    "retrieve 0 application after cluster just started" in {
+      restClient.listRunningApps().length shouldEqual 0
+    }
+
+    "retrieve 1 application after the first application submission" in {
+      // exercise
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+      restClient.listRunningApps().length shouldEqual 1
+    }
+  }
+
+  "submit application (wordcount)" should {
+    "find a running application after submission" in {
+      // exercise
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+    }
+
+    "reject a repeated submission request while the application is running" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
+        cluster.getWorkerHosts.length)
+      formerSubmissionSuccess shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe false
+    }
+
+    "reject an invalid submission (the jar file path is incorrect)" in {
+      // exercise
+      val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length)
+      success shouldBe false
+    }
+
+    "submit a wordcount application with 4 split and 3 sum processors and expect " +
+      "parallelism of processors match the given number" in {
+      // setup
+      val splitNum = 4
+      val sumNum = 3
+      val appId = restClient.getNextAvailableAppId()
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
+        s"-split $splitNum -sum $sumNum")
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+      val processors = restClient.queryStreamingAppDetail(appId).processors
+      processors.size shouldEqual 2
+      val splitProcessor = processors.get(0).get
+      splitProcessor.parallelism shouldEqual splitNum
+      val sumProcessor = processors.get(1).get
+      sumProcessor.parallelism shouldEqual sumNum
+    }
+
+    "can obtain application metrics and the metrics will keep changing" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      expectMetricsAvailable(
+        restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty,
+        "metrics available")
+      val actual = restClient.queryStreamingAppMetrics(appId, current = true)
+      actual.path shouldEqual s"app$appId.processor*"
+      actual.metrics.foreach(metric => {
+        metric.time should be > 0L
+        metric.value should not be null
+      })
+      val formerMetricsDump = actual.metrics.toString()
+
+      expectMetricsAvailable({
+        val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics
+        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+      }, "metrics available")
+    }
+
+    "can obtain application corresponding executors' metrics and " +
+      "the metrics will keep changing" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      expectMetricsAvailable(
+        restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty,
+        "metrics available")
+      val actual = restClient.queryExecutorMetrics(appId, current = true)
+      actual.path shouldEqual s"app$appId.executor*"
+      actual.metrics.foreach(metric => {
+        metric.time should be > 0L
+        metric.value should not be null
+      })
+      val formerMetricsDump = actual.metrics.toString()
+
+      expectMetricsAvailable({
+        val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics
+        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+      }, "metrics available")
+    }
+  }
+
+  "kill application" should {
+    "a running application should be killed" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+
+      // exercise
+      killAppAndVerify(appId)
+    }
+
+    "should fail when attempting to kill a stopped application" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+      val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      submissionSucess shouldBe true
+      expectAppIsRunning(appId, wordCountName)
+      killAppAndVerify(appId)
+
+      // exercise
+      val success = restClient.killApp(appId)
+      success shouldBe false
+    }
+
+    "should fail when attempting to kill a non-exist application" in {
+      // setup
+      val freeAppId = restClient.listApps().length + 1
+
+      // exercise
+      val success = restClient.killApp(freeAppId)
+      success shouldBe false
+    }
+  }
+
+  "cluster information" should {
+    "retrieve 1 master for a non-HA cluster" in {
+      // exercise
+      val masterSummary = restClient.queryMaster()
+      masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses
+      masterSummary.aliveFor should be > 0L
+      masterSummary.masterStatus shouldEqual MasterStatus.Synced
+    }
+
+    "retrieve the same number of workers as cluster has" in {
+      // setup
+      val expectedWorkersCount = cluster.getWorkerHosts.size
+
+      // exercise
+      var runningWorkers: Array[WorkerSummary] = Array.empty
+      Util.retryUntil(() => {
+        runningWorkers = restClient.listRunningWorkers()
+        runningWorkers.length == expectedWorkersCount
+      }, "all workers running")
+      runningWorkers.foreach { worker =>
+        worker.state shouldEqual MasterToAppMaster.AppMasterActive
+      }
+    }
+
+    "find a newly added worker instance" in {
+      // setup
+      restartClusterRequired = true
+      val formerWorkersCount = cluster.getWorkerHosts.length
+      Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount,
+        "all workers running")
+      val workerName = "newWorker"
+
+      // exercise
+      cluster.addWorkerNode(workerName)
+      Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount,
+        "new worker added")
+      cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
+      restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
+    }
+
+    "retrieve 0 worker, if cluster is started without any workers" in {
+      // setup
+      restartClusterRequired = true
+      cluster.shutDown()
+
+      // exercise
+      cluster.start(workerNum = 0)
+      cluster.getWorkerHosts.length shouldEqual 0
+      restClient.listRunningWorkers().length shouldEqual 0
+    }
+
+    "can obtain master's metrics and the metrics will keep changing" in {
+      // exercise
+      expectMetricsAvailable(
+        restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available")
+      val actual = restClient.queryMasterMetrics(current = true)
+      actual.path shouldEqual s"master"
+      actual.metrics.foreach(metric => {
+        metric.time should be > 0L
+        metric.value should not be null
+      })
+      val formerMetricsDump = actual.metrics.toString()
+
+      expectMetricsAvailable({
+        val laterMetrics = restClient.queryMasterMetrics(current = true).metrics
+        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+      }, "metrics available")
+    }
+
+    "can obtain workers' metrics and the metrics will keep changing" in {
+      // exercise
+      restClient.listRunningWorkers().foreach { worker =>
+        val workerId = worker.workerId
+        expectMetricsAvailable(
+          restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty,
+          "metrics available")
+        val actual = restClient.queryWorkerMetrics(workerId, current = true)
+        actual.path shouldEqual s"worker${WorkerId.render(workerId)}"
+        actual.metrics.foreach(metric => {
+          metric.time should be > 0L
+          metric.value should not be null
+        })
+        val formerMetricsDump = actual.metrics.toString()
+
+        expectMetricsAvailable({
+          val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics
+          laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+        }, "metrics available")
+      }
+    }
+  }
+
+  "configuration" should {
+    "retrieve the configuration of master and match particular values" in {
+      // exercise
+      val actual = restClient.queryMasterConfig()
+      actual.hasPath("gearpump") shouldBe true
+      actual.hasPath("gearpump.cluster") shouldBe true
+      actual.getString("gearpump.hostname") shouldEqual cluster.getMasterHosts.mkString(",")
+    }
+
+    "retrieve the configuration of worker X and match particular values" in {
+      // exercise
+      restClient.listRunningWorkers().foreach { worker =>
+        val actual = restClient.queryWorkerConfig(worker.workerId)
+        actual.hasPath("gearpump") shouldBe true
+        actual.hasPath("gearpump.worker") shouldBe true
+      }
+    }
+
+    "retrieve the configuration of executor X and match particular values" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      restClient.queryExecutorBrief(appId).foreach { executor =>
+        val executorId = executor.executorId
+        val actual = restClient.queryExecutorConfig(appId, executorId)
+        actual.hasPath("gearpump") shouldBe true
+        actual.hasPath("gearpump.executor") shouldBe true
+        actual.getInt("gearpump.applicationId") shouldEqual appId
+        actual.getInt("gearpump.executorId") shouldEqual executorId
+      }
+    }
+
+    "retrieve the configuration of application X and match particular values" in {
+      // setup
+      val appId = restClient.getNextAvailableAppId()
+
+      // exercise
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+      success shouldBe true
+      val actual = restClient.queryAppMasterConfig(appId)
+      actual.hasPath("gearpump") shouldBe true
+      actual.hasPath("gearpump.appmaster") shouldBe true
+    }
+  }
+
+  "application life-cycle" should {
+    "newly started application should be configured same as the previous one, after restart" in {
+      // setup
+      val originSplitNum = 4
+      val originSumNum = 3
+      val originAppId = restClient.getNextAvailableAppId()
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
+        s"-split $originSplitNum -sum $originSumNum")
+      success shouldBe true
+      expectAppIsRunning(originAppId, wordCountName)
+      val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
+
+      // exercise
+      Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted")
+      val killedApp = restClient.queryApp(originAppId)
+      killedApp.appId shouldEqual originAppId
+      killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+      val runningApps = restClient.listRunningApps()
+      runningApps.length shouldEqual 1
+      val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId)
+      newAppDetail.appName shouldEqual originAppDetail.appName
+      newAppDetail.processors.size shouldEqual originAppDetail.processors.size
+      newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum
+      newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum
+    }
+  }
+
+  private def killAppAndVerify(appId: Int): Unit = {
+    val success = restClient.killApp(appId)
+    success shouldBe true
+
+    val actualApp = restClient.queryApp(appId)
+    actualApp.appId shouldEqual appId
+    actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+  }
+
+  private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = {
+    val config = restClient.queryMasterConfig()
+    val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
+    Util.retryUntil(() => condition, conditionDescription, interval = reportInterval)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
new file mode 100644
index 0000000..4b15055
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import scala.concurrent.duration.Duration
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * The test spec will perform destructive operations to check the stability. Operations
+ * contains shutting-down appmaster, executor, or worker, and etc..
+ */
+class StabilitySpec extends TestSpecBase {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  "kill appmaster" should {
+    "restart the whole application" in {
+      // setup
+      val appId = commandLineClient.submitApp(wordCountJar)
+      val formerAppMaster = restClient.queryApp(appId).appMasterPath
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+      ensureClockStoredInMaster()
+
+      // exercise
+      restClient.killAppMaster(appId) shouldBe true
+      // todo: how long master will begin to recover and how much time for the recovering?
+      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
+        "appmaster killed and restarted")
+
+      // verify
+      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.clock should be > 0L
+    }
+  }
+
+  "kill executor" should {
+    "will create a new executor and application will replay from the latest application clock" in {
+      // setup
+      val appId = commandLineClient.submitApp(wordCountJar)
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+      val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+      ensureClockStoredInMaster()
+
+      // exercise
+      restClient.killExecutor(appId, executorToKill) shouldBe true
+      // todo: how long appmaster will begin to recover and how much time for the recovering?
+      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+        .map(_.executorId).max > executorToKill,
+        s"executor $executorToKill killed and restarted")
+
+      // verify
+      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.clock should be > 0L
+    }
+  }
+
+  private def hostName(workerId: WorkerId): String = {
+    val worker = restClient.listRunningWorkers().find(_.workerId == workerId)
+    // Parse hostname from JVM info (in format: PID@hostname)
+    val hostname = worker.get.jvmName.split("@")(1)
+    hostname
+  }
+
+  "kill worker" should {
+    "worker will not recover but all its executors will be migrated to other workers" in {
+      // setup
+      restartClusterRequired = true
+      val appId = commandLineClient.submitApp(wordCountJar)
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+
+      val allexecutors = restClient.queryExecutorBrief(appId)
+      val maxExecutor = allexecutors.sortBy(_.executorId).last
+      ensureClockStoredInMaster()
+
+      val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+      LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
+        s"worker: ${maxExecutor.workerId}")
+      val executorsSharingSameWorker = allexecutors
+        .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
+      LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," +
+        s" ${executorsSharingSameWorker.mkString(",")}")
+
+      // kill the worker and expect restarting all killed executors on other workers.
+      val workerIdToKill = maxExecutor.workerId
+      cluster.removeWorkerNode(hostName(workerIdToKill))
+
+      val appMasterKilled = executorsSharingSameWorker
+        .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+      def executorsMigrated(): Boolean = {
+        val executors = restClient.queryExecutorBrief(appId)
+        val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+        if (appMasterKilled) {
+          newAppMaster.get.workerId != appMaster.get.workerId
+        } else {
+          // New executors will be started to replace killed executors.
+          // The new executors will be assigned larger executor Id. We use this trick to detect
+          // Whether new executors have been started successfully.
+          executors.map(_.executorId).max > maxExecutor.executorId
+        }
+      }
+
+      Util.retryUntil(() => {
+        executorsMigrated()
+      }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed")
+
+      // verify
+      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.clock should be > 0L
+    }
+  }
+
+  "kill master" should {
+    "master will be down and all workers will attempt to reconnect and suicide after X seconds" in {
+      // setup
+      restartClusterRequired = true
+      val masters = cluster.getMasterHosts
+      val config = restClient.queryMasterConfig()
+      val shutDownTimeout = Duration(config.getString("akka.cluster.auto-down-unreachable-after"))
+
+      // exercise
+      masters.foreach(cluster.removeMasterNode)
+      info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers are down")
+      Thread.sleep(shutDownTimeout.toMillis)
+
+      // verify
+      val aliveWorkers = cluster.getWorkerHosts
+      Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)),
+        "all workers down")
+    }
+  }
+
+  private def ensureClockStoredInMaster(): Unit = {
+    // TODO: 5000ms is a fixed sync period in clock service.
+    // we wait for 5000ms to assume the clock is stored
+    Thread.sleep(5000)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
new file mode 100644
index 0000000..b327bf4
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader}
+import org.apache.gearpump.integrationtest.storm.StormClient
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks the compatibility of running Storm applications
+ */
+class StormCompatibilitySpec extends TestSpecBase {
+
+  private lazy val stormClient = {
+    new StormClient(cluster, restClient)
+  }
+
+  val `version0.9` = "09"
+  val `version0.10` = "010"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    stormClient.start()
+  }
+
+  override def afterAll(): Unit = {
+    stormClient.shutDown()
+    super.afterAll()
+  }
+
+  def withStorm(testCode: String => Unit): Unit = {
+    testCode(`version0.9`)
+    testCode(`version0.10`)
+  }
+
+  def getTopologyName(name: String, stormVersion: String): String = {
+    s"${name}_$stormVersion"
+  }
+
+  def getStormJar(stormVersion: String): String = {
+    cluster.queryBuiltInITJars(s"storm$stormVersion-").head
+  }
+
+  "Storm over Gearpump" should withStorm {
+    stormVersion =>
+      s"support basic topologies ($stormVersion)" in {
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("exclamation", stormVersion)
+
+        // exercise
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.ExclamationTopology",
+          args = topologyName,
+          appName = topologyName)
+
+        // verify
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+      }
+
+      s"support to run a python version of wordcount ($stormVersion)" in {
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("wordcount", stormVersion)
+
+        // exercise
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.WordCountTopology",
+          args = topologyName,
+          appName = topologyName)
+
+        // verify
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+      }
+
+      s"support DRPC ($stormVersion)" in {
+        // ReachTopology computes the Twitter url reached by users and their followers
+        // using Storm Distributed RPC feature
+        // input (user and follower) data are already prepared in memory
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("reach", stormVersion)
+        stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.ReachTopology",
+          args = topologyName,
+          appName = topologyName)
+        val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
+
+        // verify
+        Util.retryUntil(() => {
+          drpcClient.execute("reach", "notaurl.com") == "0"
+        }, "drpc reach == 0")
+        drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
+        drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14"
+      }
+
+      s"support tick tuple ($stormVersion)" in {
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
+
+        // exercise
+        val appId = stormClient.submitStormApp(
+          jar = stormJar,
+          mainClass = "storm.starter.RollingTopWords",
+          args = s"$topologyName remote",
+          appName = topologyName)
+
+        // verify
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+      }
+
+      s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
+
+        val stormJar = getStormJar(stormVersion)
+        val topologyName = getTopologyName("storm_kafka", stormVersion)
+        val stormKafkaTopology =
+          s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
+
+        import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
+        withKafkaCluster(cluster) {
+          kafkaCluster =>
+            val sourcePartitionNum = 2
+            val sinkPartitionNum = 1
+            val zookeeper = kafkaCluster.getZookeeperConnectString
+            val brokerList = kafkaCluster.getBrokerListConnectString
+            val sourceTopic = "topic1"
+            val sinkTopic = "topic2"
+
+            val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
+              "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
+              "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum"
+            )
+
+            kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+            // generate number sequence (1, 2, 3, ...) to the topic
+            withDataProducer(sourceTopic, brokerList) { producer =>
+
+              val appId = stormClient.submitStormApp(
+                jar = stormJar,
+                mainClass = stormKafkaTopology,
+                args = args.mkString(" "),
+                appName = topologyName)
+
+              Util.retryUntil(() =>
+                restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+
+              // kill executor and verify at-least-once is guaranteed on application restart
+              val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+              restClient.killExecutor(appId, executorToKill) shouldBe true
+              Util.retryUntil(() =>
+                restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill,
+                s"executor $executorToKill killed")
+
+              // verify no message loss
+              val detector = new
+                  MessageLossDetector(producer.lastWriteNum)
+              val kafkaReader = new
+                  SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost,
+                    port = kafkaCluster.advertisedPort)
+
+              Util.retryUntil(() => {
+                kafkaReader.read()
+                detector.allReceived
+              }, "all kafka message read")
+            }
+        }
+      }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
new file mode 100644
index 0000000..7942308
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.suites
+
+import org.scalatest._
+
+import org.apache.gearpump.integrationtest.MiniClusterProvider
+import org.apache.gearpump.integrationtest.checklist._
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+
+/**
+ * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific
+ * test spec, you need to comment out other lines.
+ */
+class StandaloneModeSuite extends Suites(
+  new CommandLineSpec,
+  new RestServiceSpec,
+  new ExampleSpec,
+  new DynamicDagSpec,
+  new StormCompatibilitySpec,
+  new StabilitySpec,
+  new ConnectorKafkaSpec,
+  new MessageDeliverySpec
+) with BeforeAndAfterAll {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    MiniClusterProvider.managed = true
+    MiniClusterProvider.set(new MiniCluster).start()
+  }
+
+  override def afterAll(): Unit = {
+    MiniClusterProvider.get.shutDown()
+    super.afterAll()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
deleted file mode 100644
index f315ad3..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.apache.log4j.Logger
-
-/**
- * The class is used to execute Docker commands.
- */
-object Docker {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  /**
-   * @throws RuntimeException in case retval != 0
-   */
-  private def doExecute(container: String, command: String): String = {
-    ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container")
-  }
-
-  private def doExecuteSilently(container: String, command: String): Boolean = {
-    ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
-  }
-
-  /**
-   * @throws RuntimeException in case retval != 0
-   */
-  final def execute(container: String, command: String): String = {
-    trace(container, s"Execute $command") {
-      doExecute(container, command)
-    }
-  }
-
-  final def executeSilently(container: String, command: String): Boolean = {
-    trace(container, s"Execute silently $command") {
-      doExecuteSilently(container, command)
-    }
-  }
-
-  final def listContainers(): Seq[String] = {
-    trace("", s"Listing how many containers...") {
-      ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
-        .split("\n").filter(_.nonEmpty)
-    }
-  }
-
-  final def containerIsRunning(name: String): Boolean = {
-    trace(name, s"Check container running or not...") {
-      ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty
-    }
-  }
-
-  final def getContainerIPAddr(name: String): String = {
-    trace(name, s"Get Ip Address") {
-      Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
-    }
-  }
-
-  final def containerExists(name: String): Boolean = {
-    trace(name, s"Check container existing or not...") {
-      ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty
-    }
-  }
-
-  /**
-   * @throws RuntimeException in case particular container is created already
-   */
-  final def createAndStartContainer(name: String, image: String, command: String,
-      environ: Map[String, String] = Map.empty, // key, value
-      volumes: Map[String, String] = Map.empty, // from, to
-      knownHosts: Set[String] = Set.empty,
-      tunnelPorts: Set[Int] = Set.empty): String = {
-
-    if (containerExists(name)) {
-      killAndRemoveContainer(name)
-    }
-
-    trace(name, s"Create and start $name ($image)...") {
-
-      val optsBuilder = new StringBuilder
-      optsBuilder.append("-d") // run in background
-      optsBuilder.append(" -h " + name) // use container name as hostname
-      optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings
-
-      environ.foreach { case (key, value) =>
-        optsBuilder.append(s" -e $key=$value")
-      }
-      volumes.foreach { case (from, to) =>
-        optsBuilder.append(s" -v $from:$to")
-      }
-      knownHosts.foreach(host =>
-        optsBuilder.append(" --link " + host)
-      )
-      tunnelPorts.foreach(port =>
-        optsBuilder.append(s" -p $port:$port")
-      )
-      createAndStartContainer(name, optsBuilder.toString(), command, image)
-    }
-  }
-
-  /**
-   * @throws RuntimeException in case particular container is created already
-   */
-  private def createAndStartContainer(
-      name: String, options: String, command: String, image: String): String = {
-    ShellExec.execAndCaptureOutput(s"docker run $options " +
-      s"--name $name $image $command", s"MAKE $name")
-  }
-
-  final def killAndRemoveContainer(name: String): Boolean = {
-    trace(name, s"kill and remove container") {
-      ShellExec.exec(s"docker rm -f $name", s"STOP $name")
-    }
-  }
-
-  final def killAndRemoveContainer(names: Array[String]): Boolean = {
-    assert(names.length > 0)
-    val args = names.mkString(" ")
-    trace(names.mkString(","), s"kill and remove containers") {
-      ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
-    }
-  }
-
-  private def inspect(container: String, option: String): String = {
-    ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container")
-  }
-
-  final def curl(container: String, url: String, options: Array[String] = Array.empty[String])
-    : String = {
-    trace(container, s"curl $url") {
-      doExecute(container, s"curl -s ${options.mkString(" ")} $url")
-    }
-  }
-
-  final def getHostName(container: String): String = {
-    trace(container, s"Get hostname of container...") {
-      doExecute(container, "hostname")
-    }
-  }
-
-  final def getNetworkGateway(container: String): String = {
-    trace(container, s"Get gateway of container...") {
-      doExecute(container, "ip route").split("\\s+")(2)
-    }
-  }
-  final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = {
-    trace(container, s"Kill process pid: $pid") {
-      doExecuteSilently(container, s"kill -$signal $pid")
-    }
-  }
-
-  final def findJars(container: String, folder: String): Array[String] = {
-    trace(container, s"Find jars under $folder") {
-      doExecute(container, s"find $folder")
-        .split("\n").filter(_.endsWith(".jar"))
-    }
-  }
-
-  private def trace[T](container: String, msg: String)(fun: => T): T = {
-    // scalastyle:off println
-    Console.println() // A empty line to let the output looks better.
-    // scalastyle:on println
-    LOG.debug(s"Container $container====>> $msg")
-    LOG.debug("INPUT==>>")
-    val response = fun
-    LOG.debug("<<==OUTPUT")
-
-    LOG.debug(brief(response))
-
-    LOG.debug(s"<<====Command END. Container $container, $msg \n")
-    response
-  }
-
-  private val PREVIEW_MAX_LENGTH = 1024
-
-  private def brief[T](input: T): String = {
-    val output = input match {
-      case true =>
-        "Success|True"
-      case false =>
-        "Failure|False"
-      case x: Array[Any] =>
-        "Success: [" + x.mkString(",") + "]"
-      case x =>
-        x.toString
-    }
-
-    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
-      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
-    }
-    else {
-      output
-    }
-    preview
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
deleted file mode 100644
index 25d7ee3..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.sys.process._
-
-import org.apache.log4j.Logger
-import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
-
-/**
- * The class is used to execute command in a shell
- */
-object ShellExec {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val PROCESS_TIMEOUT = 2.minutes
-
-  /**
-   * The builtin command line parser by ProcessBuilder (implicit sys.process) don't
-   * respect the quote chars (' and ")
-   */
-  private def splitQuotedString(str: String): List[String] = {
-    val splitter = new QuotedStringTokenizer(str, " \t\n\r")
-    splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
-  }
-
-  def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = {
-    LOG.debug(s"$sender => `$command`")
-
-    val p = splitQuotedString(command).run()
-    val f = Future(blocking(p.exitValue())) // wrap in Future
-    val retval = {
-      try {
-        Await.result(f, timeout)
-      } catch {
-        case _: TimeoutException =>
-          LOG.error(s"timeout to execute command `$command`")
-          p.destroy()
-          p.exitValue()
-      }
-    }
-    LOG.debug(s"$sender <= exit $retval")
-    retval == 0
-  }
-
-  def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT)
-    : String = {
-    LOG.debug(s"$sender => `$command`")
-
-    val buf = new StringBuilder
-    val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"),
-      (e: String) => buf.append(e).append("\n"))
-    val p = splitQuotedString(command).run(processLogger)
-    val f = Future(blocking(p.exitValue())) // wrap in Future
-    val retval = {
-      try {
-        Await.result(f, timeout)
-      } catch {
-        case _: TimeoutException =>
-          p.destroy()
-          p.exitValue()
-      }
-    }
-    val output = buf.toString().trim
-    val PREVIEW_MAX_LENGTH = 200
-    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
-      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
-    } else {
-      output
-    }
-
-    LOG.debug(s"$sender <= `$preview` exit $retval")
-    if (retval != 0) {
-      throw new RuntimeException(
-        s"exited ($retval) by executing `$command`")
-    }
-    output
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
deleted file mode 100644
index 7e7085d..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import scala.concurrent.duration._
-import scala.util.{Failure, Success, Try}
-
-import org.apache.log4j.Logger
-
-object Util {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  def encodeUriComponent(s: String): String = {
-    try {
-      java.net.URLEncoder.encode(s, "UTF-8")
-        .replaceAll("\\+", "%20")
-        .replaceAll("\\%21", "!")
-        .replaceAll("\\%27", "'")
-        .replaceAll("\\%28", "(")
-        .replaceAll("\\%29", ")")
-        .replaceAll("\\%7E", "~")
-    } catch {
-      case ex: Throwable => s
-    }
-  }
-
-  def retryUntil(
-      condition: () => Boolean, conditionDescription: String, maxTries: Int = 15,
-      interval: Duration = 10.seconds): Unit = {
-    var met = false
-    var tries = 0
-
-    while (!met && tries < maxTries) {
-
-      met = Try(condition()) match {
-        case Success(true) => true
-        case Success(false) => false
-        case Failure(ex) => false
-      }
-
-      tries += 1
-
-      if (!met) {
-        LOG.error(s"Failed due to (false == $conditionDescription), " +
-          s"retrying for the ${tries} times...")
-        Thread.sleep(interval.toMillis)
-      } else {
-        LOG.info(s"Success ($conditionDescription) after ${tries} retries")
-      }
-    }
-
-    if (!met) {
-      throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
deleted file mode 100644
index f836abd..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.hadoop
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-object HadoopCluster {
-
-  /** Starts a Hadoop cluster */
-  def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
-    val hadoopCluster = new HadoopCluster
-    try {
-      hadoopCluster.start()
-      testCode(hadoopCluster)
-    } finally {
-      hadoopCluster.shutDown()
-    }
-  }
-}
-/**
- * This class maintains a single node Hadoop cluster
- */
-class HadoopCluster {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
-  private val HADOOP_HOST = "hadoop0"
-
-  def start(): Unit = {
-    Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
-
-    Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
-    LOG.info("Hadoop cluster is started.")
-  }
-
-  // Checks whether the cluster is alive by listing "/"
-  private def isAlive: Boolean = {
-    Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /")
-  }
-
-  def getDefaultFS: String = {
-    val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
-    s"hdfs://$hostIPAddr:9000"
-  }
-
-  def shutDown(): Unit = {
-    Docker.killAndRemoveContainer(HADOOP_HOST)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
deleted file mode 100644
index 15ba084..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-object KafkaCluster {
-
-  /** Starts a Kafka cluster */
-  def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
-    val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
-    try {
-      kafkaCluster.start()
-      testCode(kafkaCluster)
-    } finally {
-      kafkaCluster.shutDown()
-    }
-  }
-
-  def withDataProducer(topic: String, brokerList: String)
-    (testCode: NumericalDataProducer => Unit): Unit = {
-    val producer = new NumericalDataProducer(topic, brokerList)
-    try {
-      producer.start()
-      testCode(producer)
-    } finally {
-      producer.stop()
-    }
-  }
-}
-
-/**
- * This class maintains a single node Kafka cluster with integrated Zookeeper.
- */
-class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
-  private val KAFKA_HOST = "kafka0"
-  private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
-  private val ZOOKEEPER_PORT = 2181
-  private val BROKER_PORT = 9092
-  val advertisedPort = BROKER_PORT
-
-  def start(): Unit = {
-    Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
-      environ = Map(
-        "ADVERTISED_HOST" -> advertisedHost,
-        "ADVERTISED_PORT" -> BROKER_PORT.toString,
-        "ZK_CHROOT" -> zkChroot),
-      tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
-    )
-    Util.retryUntil(() => isAlive, "kafka cluster is alive")
-    LOG.debug("kafka cluster is started.")
-  }
-
-  def isAlive: Boolean = {
-    !listTopics().contains("Connection refused")
-  }
-
-  def shutDown(): Unit = {
-    Docker.killAndRemoveContainer(KAFKA_HOST)
-  }
-
-  private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
-
-  def listTopics(): String = {
-    kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
-  }
-
-  def getZookeeperConnectString: String = {
-    s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
-  }
-
-  def getBrokerListConnectString: String = {
-    s"$hostIPAddr:$BROKER_PORT"
-  }
-
-  def createTopic(topic: String, partitions: Int = 1): Unit = {
-    LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
-
-    Docker.executeSilently(KAFKA_HOST,
-      s"$KAFKA_HOME/bin/kafka-topics.sh" +
-        s" --zookeeper $getZookeeperConnectString" +
-        s" --create --topic $topic --partitions $partitions --replication-factor 1")
-  }
-
-  def produceDataToKafka(topic: String, messageNum: Int): Unit = {
-    Docker.executeSilently(KAFKA_HOST,
-      s"$KAFKA_HOME/bin/kafka-topics.sh" +
-        s" --zookeeper $getZookeeperConnectString" +
-        s" --create --topic $topic --partitions 1 --replication-factor 1")
-
-    Docker.executeSilently(KAFKA_HOST,
-      s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
-        s" --broker-list $getBrokerListConnectString" +
-        s" --topic $topic --messages $messageNum")
-  }
-
-  def getLatestOffset(topic: String): Int = {
-    kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
-  }
-
-  private def kafkaListTopics(
-      container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
-
-    LOG.debug(s"|=> Kafka list topics...")
-    Docker.execute(container,
-      s"$kafkaHome/bin/kafka-topics.sh" +
-        s" --zookeeper $zookeeperConnectionString -list")
-  }
-
-  private def kafkaFetchLatestOffset(
-      container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
-    LOG.debug(s"|=> Get latest offset of topic $topic...")
-    val output = Docker.execute(container,
-      s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
-        s" --broker-list $brokersList " +
-        s" --topic $topic --time -1")
-    output.split(":")(2).toInt
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
deleted file mode 100644
index 1cf3125..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import java.util.Properties
-
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.streaming.serializer.ChillSerializer
-
-class NumericalDataProducer(topic: String, bootstrapServers: String) {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val producer = createProducer
-  private val WRITE_SLEEP_NANOS = 10
-  private val serializer = new ChillSerializer[Int]
-  var lastWriteNum = 0
-
-  def start(): Unit = {
-    produceThread.start()
-  }
-
-  def stop(): Unit = {
-    if (produceThread.isAlive) {
-      produceThread.interrupt()
-      produceThread.join()
-    }
-    producer.close()
-  }
-
-  /** How many message we have written in total */
-  def producedNumbers: Range = {
-    Range(1, lastWriteNum + 1)
-  }
-
-  private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
-    val properties = new Properties()
-    properties.setProperty("bootstrap.servers", bootstrapServers)
-    new KafkaProducer[Array[Byte], Array[Byte]](properties,
-      new ByteArraySerializer, new ByteArraySerializer)
-  }
-
-  private val produceThread = new Thread(new Runnable {
-    override def run(): Unit = {
-      try {
-        while (!Thread.currentThread.isInterrupted) {
-          lastWriteNum += 1
-          val msg = serializer.serialize(lastWriteNum)
-          val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
-          producer.send(record)
-          Thread.sleep(0, WRITE_SLEEP_NANOS)
-        }
-      } catch {
-        case ex: InterruptedException =>
-          LOG.error("message producing is stopped by an interrupt")
-      }
-    }
-  })
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
deleted file mode 100644
index 1f773d3..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import scala.collection.mutable
-
-trait ResultVerifier {
-  def onNext(num: Int): Unit
-}
-
-class MessageLossDetector(totalNum: Int) extends ResultVerifier {
-  private val bitSets = new mutable.BitSet(totalNum)
-  var result = List.empty[Int]
-
-  override def onNext(num: Int): Unit = {
-    bitSets.add(num)
-    result :+= num
-  }
-
-  def allReceived: Boolean = {
-    1.to(totalNum).forall(bitSets)
-  }
-
-  def received(num: Int): Boolean = {
-    bitSets(num)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
deleted file mode 100644
index 392ca86..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import scala.util.{Failure, Success}
-
-import kafka.api.FetchRequestBuilder
-import kafka.consumer.SimpleConsumer
-import kafka.utils.Utils
-
-import org.apache.gearpump.streaming.serializer.ChillSerializer
-
-class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0,
-    host: String, port: Int) {
-
-  private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
-  private val serializer = new ChillSerializer[Int]
-  private var offset = 0L
-
-  def read(): Unit = {
-    val messageSet = consumer.fetch(
-      new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build()
-    ).messageSet(topic, partition)
-
-    for (messageAndOffset <- messageSet) {
-      serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match {
-        case Success(msg) =>
-          offset = messageAndOffset.nextOffset
-          verifier.onNext(msg)
-        case Failure(e) => throw e
-      }
-    }
-  }
-}
\ No newline at end of file



[4/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the integration test file structure

Posted by ma...@apache.org.
fix GEARPUMP-150 correct the integration test file structure

Author: huafengw <fv...@gmail.com>

Closes #26 from huafengw/name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e9ea6e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e9ea6e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e9ea6e2f

Branch: refs/heads/master
Commit: e9ea6e2f3366ede2aa350204e8b3854a6d0081ca
Parents: c80c069
Author: huafengw <fv...@gmail.com>
Authored: Fri May 27 19:38:40 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri May 27 19:38:40 2016 +0800

----------------------------------------------------------------------
 .../integrationtest/MiniClusterProvider.scala   |  41 ---
 .../gearpump/integrationtest/TestSpecBase.scala |  92 -----
 .../checklist/CommandLineSpec.scala             | 133 -------
 .../checklist/ConnectorKafkaSpec.scala          | 120 ------
 .../checklist/DynamicDagSpec.scala              | 157 --------
 .../integrationtest/checklist/ExampleSpec.scala | 148 --------
 .../checklist/MessageDeliverySpec.scala         | 122 ------
 .../checklist/RestServiceSpec.scala             | 369 -------------------
 .../checklist/StabilitySpec.scala               | 162 --------
 .../checklist/StormCompatibilitySpec.scala      | 185 ----------
 .../suites/StandaloneModeSuite.scala            |  51 ---
 .../integrationtest/MiniClusterProvider.scala   |  41 +++
 .../gearpump/integrationtest/TestSpecBase.scala |  92 +++++
 .../checklist/CommandLineSpec.scala             | 133 +++++++
 .../checklist/ConnectorKafkaSpec.scala          | 120 ++++++
 .../checklist/DynamicDagSpec.scala              | 157 ++++++++
 .../integrationtest/checklist/ExampleSpec.scala | 148 ++++++++
 .../checklist/MessageDeliverySpec.scala         | 122 ++++++
 .../checklist/RestServiceSpec.scala             | 369 +++++++++++++++++++
 .../checklist/StabilitySpec.scala               | 162 ++++++++
 .../checklist/StormCompatibilitySpec.scala      | 185 ++++++++++
 .../suites/StandaloneModeSuite.scala            |  51 +++
 .../io/gearpump/integrationtest/Docker.scala    | 211 -----------
 .../io/gearpump/integrationtest/ShellExec.scala |  98 -----
 .../io/gearpump/integrationtest/Util.scala      |  72 ----
 .../integrationtest/hadoop/HadoopCluster.scala  |  67 ----
 .../integrationtest/kafka/KafkaCluster.scala    | 140 -------
 .../kafka/NumericalDataProducer.scala           |  76 ----
 .../integrationtest/kafka/ResultVerifier.scala  |  42 ---
 .../kafka/SimpleKafkaReader.scala               |  49 ---
 .../minicluster/BaseContainer.scala             |  60 ---
 .../minicluster/CommandLineClient.scala         |  84 -----
 .../minicluster/MiniCluster.scala               | 189 ----------
 .../minicluster/RestClient.scala                | 268 --------------
 .../integrationtest/storm/StormClient.scala     |  91 -----
 .../gearpump/integrationtest/Docker.scala       | 211 +++++++++++
 .../gearpump/integrationtest/ShellExec.scala    |  98 +++++
 .../apache/gearpump/integrationtest/Util.scala  |  72 ++++
 .../integrationtest/hadoop/HadoopCluster.scala  |  67 ++++
 .../integrationtest/kafka/KafkaCluster.scala    | 140 +++++++
 .../kafka/NumericalDataProducer.scala           |  76 ++++
 .../integrationtest/kafka/ResultVerifier.scala  |  42 +++
 .../kafka/SimpleKafkaReader.scala               |  49 +++
 .../minicluster/BaseContainer.scala             |  60 +++
 .../minicluster/CommandLineClient.scala         |  84 +++++
 .../minicluster/MiniCluster.scala               | 189 ++++++++++
 .../minicluster/RestClient.scala                | 268 ++++++++++++++
 .../integrationtest/storm/StormClient.scala     |  91 +++++
 .../integrationtest/storm/Adaptor.scala         |  38 --
 .../storm/Storm010KafkaTopology.scala           |  95 -----
 .../integrationtest/storm/Adaptor.scala         |  38 ++
 .../storm/Storm010KafkaTopology.scala           |  95 +++++
 .../integrationtest/storm/Adaptor.scala         |  38 --
 .../storm/Storm09KafkaTopology.scala            |  95 -----
 .../integrationtest/storm/Adaptor.scala         |  38 ++
 .../storm/Storm09KafkaTopology.scala            |  95 +++++
 56 files changed, 3293 insertions(+), 3293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
deleted file mode 100644
index 4a161c7..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-
-/**
- * Provides a min cluster of Gearpump, which contains one or more masters, and workers.
- */
-object MiniClusterProvider {
-
-  private var instance = new MiniCluster
-
-  def get: MiniCluster = instance
-
-  def set(instance: MiniCluster): MiniCluster = {
-    this.instance = instance
-    instance
-  }
-
-  /**
-   * Indicates whether test suite should create particular cluster. In case of false, every
-   * test spec will be responsible for cluster creation.
-   */
-  var managed = false
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
deleted file mode 100644
index dabcc71..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.scalatest._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
-import org.apache.gearpump.util.LogUtil
-
-/**
- * The abstract test spec
- */
-trait TestSpecBase
-  extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll {
-
-  private def LOGGER = LogUtil.getLogger(getClass)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    if (!MiniClusterProvider.managed) {
-      LOGGER.info("Will test with a default standalone mini cluster")
-      MiniClusterProvider.get.start()
-    }
-  }
-
-  override def afterAll(): Unit = {
-    if (!MiniClusterProvider.managed) {
-      LOGGER.info("Will shutdown the default mini cluster")
-      MiniClusterProvider.get.shutDown()
-    }
-    super.afterAll()
-  }
-
-  lazy val cluster = MiniClusterProvider.get
-  lazy val commandLineClient = cluster.commandLineClient
-  lazy val restClient = cluster.restClient
-
-  lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head
-  lazy val wordCountName = "wordCount"
-
-  var restartClusterRequired: Boolean = false
-
-  override def beforeEach(td: TestData): Unit = {
-
-    LOGGER.debug(s">### =============================================================")
-    LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
-
-    assert(cluster != null, "Configure MiniCluster properly in suite spec")
-    cluster.isAlive shouldBe true
-    restClient.listRunningApps().isEmpty shouldBe true
-    LOGGER.debug(s">### =============================================================")
-    LOGGER.debug(s">###2 Start test: ${td.name}\n")
-  }
-
-  override def afterEach(td: TestData): Unit = {
-    LOGGER.debug(s"<### =============================================================")
-    LOGGER.debug(s"<###3 End test: ${td.name}\n")
-
-    if (restartClusterRequired || !cluster.isAlive) {
-      restartClusterRequired = false
-      LOGGER.info("Will restart the cluster for next test case")
-      cluster.restart()
-    } else {
-      restClient.listRunningApps().foreach(app => {
-        commandLineClient.killApp(app.appId) shouldBe true
-      })
-    }
-  }
-
-  def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = {
-    val app = restClient.queryApp(appId)
-    app.status shouldEqual MasterToAppMaster.AppMasterActive
-    app.appName shouldEqual expectedAppName
-    app
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
deleted file mode 100644
index eabc684..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.TestSpecBase
-
-/**
- * The test spec checks the command-line usage
- */
-class CommandLineSpec extends TestSpecBase {
-
-  "use `gear info` to list applications" should {
-    "retrieve 0 application after cluster just started" in {
-      // exercise
-      getRunningAppCount shouldEqual 0
-    }
-
-    "retrieve 1 application after the first application submission" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      expectAppIsRunningByParsingOutput(appId, wordCountName)
-
-      // exercise
-      getRunningAppCount shouldEqual 1
-    }
-  }
-
-  "use `gear app` to submit application" should {
-    "find a running application after submission" in {
-      // exercise
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      expectAppIsRunningByParsingOutput(appId, wordCountName)
-    }
-
-    "reject a repeated submission request while the application is running" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      expectAppIsRunningByParsingOutput(appId, wordCountName)
-
-      // exercise
-      val actualAppId = commandLineClient.submitApp(wordCountJar)
-      actualAppId shouldEqual -1
-    }
-
-    "reject an invalid submission (the jar file path is incorrect)" in {
-      // exercise
-      val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing")
-      actualAppId shouldEqual -1
-    }
-  }
-
-  "use `gear kill` to kill application" should {
-    "a running application should be killed" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-
-      // exercise
-      val success = commandLineClient.killApp(appId)
-      success shouldBe true
-    }
-
-    "should fail when attempting to kill a stopped application" in {
-      // setup
-      val appId = expectSubmitAppSuccess(wordCountJar)
-      var success = commandLineClient.killApp(appId)
-      success shouldBe true
-
-      // exercise
-      success = commandLineClient.killApp(appId)
-      success shouldBe false
-    }
-
-    "the EmbededCluster can be used as embedded cluster in process" in {
-      // setup
-      val args = "-debug true -sleep 10"
-      val appId = expectSubmitAppSuccess(wordCountJar, args)
-      var success = commandLineClient.killApp(appId)
-      success shouldBe true
-    }
-
-    "should fail when attempting to kill a non-exist application" in {
-      // setup
-      val freeAppId = getNextAvailableAppId
-
-      // exercise
-      val success = commandLineClient.killApp(freeAppId)
-      success shouldBe false
-    }
-  }
-
-  "use `gear replay` to replay the application from current min clock" should {
-    "todo: description" in {
-      // todo: test code
-    }
-  }
-
-  private def getRunningAppCount: Int = {
-    commandLineClient.listRunningApps().length
-  }
-
-  private def getNextAvailableAppId: Int = {
-    commandLineClient.listApps().length + 1
-  }
-
-  private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
-    val appId = commandLineClient.submitApp(jar)
-    appId should not equal -1
-    appId
-  }
-
-  private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = {
-    val actual = commandLineClient.queryApp(appId)
-    actual should include(s"application: $appId, ")
-    actual should include(s"name: $expectedName, ")
-    actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
deleted file mode 100644
index d8bdc1e..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.scalatest.TestData
-
-import org.apache.gearpump.integrationtest.kafka._
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks the Kafka datasource connector
- */
-class ConnectorKafkaSpec extends TestSpecBase {
-
-  private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
-  private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head
-  private var producer: NumericalDataProducer = null
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    kafkaCluster.start()
-  }
-
-  override def afterAll(): Unit = {
-    kafkaCluster.shutDown()
-    super.afterAll()
-  }
-
-  override def afterEach(test: TestData): Unit = {
-    super.afterEach(test)
-    if (producer != null) {
-      producer.stop()
-      producer = null
-    }
-  }
-
-  "KafkaSource and KafkaSink" should {
-    "read from and write to kafka" in {
-      // setup
-      val sourceTopic = "topic1"
-      val sinkTopic = "topic2"
-      val messageNum = 10000
-      kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
-
-      // exercise
-      val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
-        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
-        "-brokerList", kafkaCluster.getBrokerListConnectString,
-        "-sourceTopic", sourceTopic,
-        "-sinkTopic", sinkTopic).mkString(" ")
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
-      success shouldBe true
-
-      // verify
-      expectAppIsRunning(appId, "KafkaReadWrite")
-      Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum,
-        "kafka all message written")
-    }
-  }
-
-  "Gearpump with Kafka" should {
-    "support at-least-once message delivery" in {
-      // setup
-      val sourcePartitionNum = 2
-      val sourceTopic = "topic3"
-      val sinkTopic = "topic4"
-      // Generate number sequence (1, 2, 3, ...) to the topic
-      kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-      producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString)
-      producer.start()
-
-      // exercise
-      val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
-        "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
-        "-brokerList", kafkaCluster.getBrokerListConnectString,
-        "-sourceTopic", sourceTopic,
-        "-sinkTopic", sinkTopic,
-        "-source", sourcePartitionNum).mkString(" ")
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
-      success shouldBe true
-
-      // verify #1
-      expectAppIsRunning(appId, "KafkaReadWrite")
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-
-      // verify #2
-      val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
-      restClient.killExecutor(appId, executorToKill) shouldBe true
-      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
-        .map(_.executorId).max > executorToKill,
-        s"executor $executorToKill killed")
-
-      // verify #3
-      val detector = new MessageLossDetector(producer.lastWriteNum)
-      val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
-        host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
-      Util.retryUntil(() => {
-        kafkaReader.read()
-        detector.allReceived
-      }, "kafka all message read")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
deleted file mode 100644
index 56b33c1..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-import org.apache.gearpump.metrics.Metrics.Meter
-import org.apache.gearpump.streaming._
-import org.apache.gearpump.streaming.appmaster.ProcessorSummary
-
-class DynamicDagSpec extends TestSpecBase {
-
-  lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
-  val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
-  val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
-  val solName = "sol"
-
-  "dynamic dag" should {
-    "can retrieve a list of built-in partitioner classes" in {
-      val partitioners = restClient.queryBuiltInPartitioners()
-      partitioners.length should be > 0
-      partitioners.foreach(clazz =>
-        clazz should startWith("org.apache.gearpump.partitioner.")
-      )
-    }
-
-    "can compose a wordcount application from scratch" in {
-      // todo: blocked by #1450
-    }
-
-    "can replace down stream with wordcount's sum processor (new processor will have metrics)" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor successfully added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
-    }
-
-    "can replace up stream with wordcount's split processor (new processor will have metrics)" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 0, splitTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput")
-    }
-
-    "fall back to last dag version when replacing a processor failid" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
-
-      val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake"
-      replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
-      Util.retryUntil(() => {
-        val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors
-        processorsAfterFailure.size == laterProcessors.size
-      }, "new processor added")
-      val currentClock = restClient.queryStreamingAppDetail(appId).clock
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock,
-        "app clock is advancing")
-    }
-
-    "fall back to last dag version when AppMaster HA triggered" in {
-      // setup
-      val appId = expectSolJarSubmittedWithAppId()
-
-      // exercise
-      val formerAppMaster = restClient.queryApp(appId).appMasterPath
-      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      replaceProcessor(appId, 1, sumTaskClass)
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new processor added")
-      processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
-
-      restClient.killAppMaster(appId) shouldBe true
-      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
-        "new AppMaster created")
-      val processors = restClient.queryStreamingAppDetail(appId).processors
-      processors.size shouldEqual laterProcessors.size
-    }
-  }
-
-  private def expectSolJarSubmittedWithAppId(): Int = {
-    val appId = restClient.getNextAvailableAppId()
-    val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
-    success shouldBe true
-    expectAppIsRunning(appId, solName)
-    Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-    appId
-  }
-
-  private def replaceProcessor(
-      appId: Int,
-      formerProcessorId: Int,
-      newTaskClass: String,
-      newProcessorDescription: String = "",
-      newParallelism: Int = 1): Unit = {
-    val uploadedJar = restClient.uploadJar(wordCountJar)
-    val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
-      newParallelism, newProcessorDescription,
-      jar = uploadedJar)
-
-    // exercise
-    val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
-    success shouldBe true
-  }
-
-  private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = {
-    Util.retryUntil(() => {
-      val actual = restClient.queryStreamingAppMetrics(appId, current = false,
-        path = "processor" + processorId)
-      val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
-      throughput.size should be > 0
-      throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
-    }, "new processor has message received")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
deleted file mode 100644
index 27e4665..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
-import org.apache.gearpump.streaming._
-import org.apache.gearpump.streaming.appmaster.ProcessorSummary
-
-/**
- * The test spec will perform destructive operations to check the stability
- */
-class ExampleSpec extends TestSpecBase {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  "distributed shell" should {
-    "execute commands on machines where its executors are running" in {
-      val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head
-      val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell"
-      val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient"
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
-      success shouldBe true
-      expectAppIsRunning(appId, "DistributedShell")
-      val args = Array(
-        clientClass,
-        "-appid", appId.toString,
-        "-command", "hostname"
-      )
-
-      val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
-
-      def verify(): Boolean = {
-        val workerNum = cluster.getWorkerHosts.length
-        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
-          workerNum, args.mkString(" ")).split("\n").
-          filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
-        expectedHostNames.forall(result.contains)
-      }
-
-      Util.retryUntil(() => verify(),
-        s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}")
-    }
-  }
-
-  "wordcount" should {
-    val wordCountJarNamePrefix = "wordcount-"
-    behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
-
-    "can submit immediately after killing a former one" in {
-      // setup
-      val formerAppId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess =
-        restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      formerSubmissionSuccess shouldBe true
-      expectAppIsRunning(formerAppId, wordCountName)
-      Util.retryUntil(() =>
-        restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running")
-      restClient.killApp(formerAppId)
-
-      // exercise
-      val appId = formerAppId + 1
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-    }
-  }
-
-  "wordcount(java)" should {
-    val wordCountJavaJarNamePrefix = "wordcountjava-"
-    val wordCountJavaName = "wordcountJava"
-    behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName)
-  }
-
-  "sol" should {
-    val solJarNamePrefix = "sol-"
-    val solName = "sol"
-    behave like streamingApplication(solJarNamePrefix, solName)
-  }
-
-  "complexdag" should {
-    val dynamicDagJarNamePrefix = "complexdag-"
-    val dynamicDagName = "dag"
-    behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
-  }
-
-  def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
-    lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
-
-    "can obtain application clock and the clock will keep changing" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, appName)
-
-      // exercise
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted")
-      val formerClock = restClient.queryStreamingAppDetail(appId).clock
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock,
-        "app clock is advancing")
-    }
-
-    "can change the parallelism and description of a processor" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
-      formerSubmissionSuccess shouldBe true
-      expectAppIsRunning(appId, appName)
-      val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
-      val processor0 = formerProcessors.get(0).get
-      val expectedProcessorId = formerProcessors.size
-      val expectedParallelism = processor0.parallelism + 1
-      val expectedDescription = processor0.description + "new"
-      val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass,
-        expectedParallelism, description = expectedDescription)
-
-      // exercise
-      val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
-      success shouldBe true
-      var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(() => {
-        laterProcessors = restClient.queryStreamingAppDetail(appId).processors
-        laterProcessors.size == formerProcessors.size + 1
-      }, "new process added")
-      val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
-      laterProcessor0.parallelism shouldEqual expectedParallelism
-      laterProcessor0.description shouldEqual expectedDescription
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
deleted file mode 100644
index bb9982a..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._
-import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
-import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader}
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * Checks message delivery consistency, like at-least-once, and exactly-once.
- */
-class MessageDeliverySpec extends TestSpecBase {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-  }
-
-  "Gearpump" should {
-    "support exactly-once message delivery" in {
-      withKafkaCluster(cluster) { kafkaCluster =>
-        // setup
-        val sourcePartitionNum = 1
-        val sourceTopic = "topic1"
-        val sinkTopic = "topic2"
-
-        // Generate number sequence (1, 2, 3, ...) to the topic
-        kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-
-        withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer =>
-
-          withHadoopCluster { hadoopCluster =>
-            // exercise
-            val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp",
-              "-defaultFS", hadoopCluster.getDefaultFS,
-              "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
-              "-brokerList", kafkaCluster.getBrokerListConnectString,
-              "-sourceTopic", sourceTopic,
-              "-sinkTopic", sinkTopic,
-              "-sourceTask", sourcePartitionNum).mkString(" ")
-            val appId = restClient.getNextAvailableAppId()
-
-            val stateJar = cluster.queryBuiltInExampleJars("state-").head
-            val success = restClient.submitApp(stateJar, executorNum = 1, args = args)
-            success shouldBe true
-
-            // verify #1
-            expectAppIsRunning(appId, "MessageCount")
-            Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0,
-              "app is running")
-
-            // wait for checkpoint to take place
-            Thread.sleep(1000)
-
-            LOG.info("Trigger message replay by kill and restart the executors")
-            val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
-            restClient.killExecutor(appId, executorToKill) shouldBe true
-            Util.retryUntil(() => restClient.queryExecutorBrief(appId)
-              .map(_.executorId).max > executorToKill, s"executor $executorToKill killed")
-
-            producer.stop()
-            val producedNumbers = producer.producedNumbers
-            LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
-              s", ${producedNumbers.end - 1}] have been written to Kafka")
-
-            // verify #3
-            val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
-
-            assert(producedNumbers.size == kafkaSourceOffset,
-              "produced message should match Kafka queue size")
-
-            LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset)
-
-            // The sink processor of this job (MessageCountApp) writes total message
-            // count to Kafka Sink periodically (once every checkpoint interval).
-            // The detector keep record of latest message count.
-            val detector = new ResultVerifier {
-              var latestMessageCount: Int = 0
-              override def onNext(messageCount: Int): Unit = {
-                this.latestMessageCount = messageCount
-              }
-            }
-
-            val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
-              host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
-            Util.retryUntil(() => {
-              kafkaReader.read()
-              LOG.info(s"Received message count: ${detector.latestMessageCount}, " +
-                s"expect: ${producedNumbers.size}")
-              detector.latestMessageCount == producedNumbers.size
-            }, "MessageCountApp calculated message count matches " +
-              "expected in case of message replay")
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
deleted file mode 100644
index 2f5bb64..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.master.MasterStatus
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks REST service usage
- */
-class RestServiceSpec extends TestSpecBase {
-
-  "query system version" should {
-    "retrieve the current version number" in {
-      restClient.queryVersion() should not be empty
-    }
-  }
-
-  "list applications" should {
-    "retrieve 0 application after cluster just started" in {
-      restClient.listRunningApps().length shouldEqual 0
-    }
-
-    "retrieve 1 application after the first application submission" in {
-      // exercise
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-      restClient.listRunningApps().length shouldEqual 1
-    }
-  }
-
-  "submit application (wordcount)" should {
-    "find a running application after submission" in {
-      // exercise
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-    }
-
-    "reject a repeated submission request while the application is running" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
-        cluster.getWorkerHosts.length)
-      formerSubmissionSuccess shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe false
-    }
-
-    "reject an invalid submission (the jar file path is incorrect)" in {
-      // exercise
-      val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length)
-      success shouldBe false
-    }
-
-    "submit a wordcount application with 4 split and 3 sum processors and expect " +
-      "parallelism of processors match the given number" in {
-      // setup
-      val splitNum = 4
-      val sumNum = 3
-      val appId = restClient.getNextAvailableAppId()
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
-        s"-split $splitNum -sum $sumNum")
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-      val processors = restClient.queryStreamingAppDetail(appId).processors
-      processors.size shouldEqual 2
-      val splitProcessor = processors.get(0).get
-      splitProcessor.parallelism shouldEqual splitNum
-      val sumProcessor = processors.get(1).get
-      sumProcessor.parallelism shouldEqual sumNum
-    }
-
-    "can obtain application metrics and the metrics will keep changing" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      expectMetricsAvailable(
-        restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty,
-        "metrics available")
-      val actual = restClient.queryStreamingAppMetrics(appId, current = true)
-      actual.path shouldEqual s"app$appId.processor*"
-      actual.metrics.foreach(metric => {
-        metric.time should be > 0L
-        metric.value should not be null
-      })
-      val formerMetricsDump = actual.metrics.toString()
-
-      expectMetricsAvailable({
-        val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics
-        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-      }, "metrics available")
-    }
-
-    "can obtain application corresponding executors' metrics and " +
-      "the metrics will keep changing" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      expectMetricsAvailable(
-        restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty,
-        "metrics available")
-      val actual = restClient.queryExecutorMetrics(appId, current = true)
-      actual.path shouldEqual s"app$appId.executor*"
-      actual.metrics.foreach(metric => {
-        metric.time should be > 0L
-        metric.value should not be null
-      })
-      val formerMetricsDump = actual.metrics.toString()
-
-      expectMetricsAvailable({
-        val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics
-        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-      }, "metrics available")
-    }
-  }
-
-  "kill application" should {
-    "a running application should be killed" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-
-      // exercise
-      killAppAndVerify(appId)
-    }
-
-    "should fail when attempting to kill a stopped application" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-      val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      submissionSucess shouldBe true
-      expectAppIsRunning(appId, wordCountName)
-      killAppAndVerify(appId)
-
-      // exercise
-      val success = restClient.killApp(appId)
-      success shouldBe false
-    }
-
-    "should fail when attempting to kill a non-exist application" in {
-      // setup
-      val freeAppId = restClient.listApps().length + 1
-
-      // exercise
-      val success = restClient.killApp(freeAppId)
-      success shouldBe false
-    }
-  }
-
-  "cluster information" should {
-    "retrieve 1 master for a non-HA cluster" in {
-      // exercise
-      val masterSummary = restClient.queryMaster()
-      masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses
-      masterSummary.aliveFor should be > 0L
-      masterSummary.masterStatus shouldEqual MasterStatus.Synced
-    }
-
-    "retrieve the same number of workers as cluster has" in {
-      // setup
-      val expectedWorkersCount = cluster.getWorkerHosts.size
-
-      // exercise
-      var runningWorkers: Array[WorkerSummary] = Array.empty
-      Util.retryUntil(() => {
-        runningWorkers = restClient.listRunningWorkers()
-        runningWorkers.length == expectedWorkersCount
-      }, "all workers running")
-      runningWorkers.foreach { worker =>
-        worker.state shouldEqual MasterToAppMaster.AppMasterActive
-      }
-    }
-
-    "find a newly added worker instance" in {
-      // setup
-      restartClusterRequired = true
-      val formerWorkersCount = cluster.getWorkerHosts.length
-      Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount,
-        "all workers running")
-      val workerName = "newWorker"
-
-      // exercise
-      cluster.addWorkerNode(workerName)
-      Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount,
-        "new worker added")
-      cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
-      restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
-    }
-
-    "retrieve 0 worker, if cluster is started without any workers" in {
-      // setup
-      restartClusterRequired = true
-      cluster.shutDown()
-
-      // exercise
-      cluster.start(workerNum = 0)
-      cluster.getWorkerHosts.length shouldEqual 0
-      restClient.listRunningWorkers().length shouldEqual 0
-    }
-
-    "can obtain master's metrics and the metrics will keep changing" in {
-      // exercise
-      expectMetricsAvailable(
-        restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available")
-      val actual = restClient.queryMasterMetrics(current = true)
-      actual.path shouldEqual s"master"
-      actual.metrics.foreach(metric => {
-        metric.time should be > 0L
-        metric.value should not be null
-      })
-      val formerMetricsDump = actual.metrics.toString()
-
-      expectMetricsAvailable({
-        val laterMetrics = restClient.queryMasterMetrics(current = true).metrics
-        laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-      }, "metrics available")
-    }
-
-    "can obtain workers' metrics and the metrics will keep changing" in {
-      // exercise
-      restClient.listRunningWorkers().foreach { worker =>
-        val workerId = worker.workerId
-        expectMetricsAvailable(
-          restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty,
-          "metrics available")
-        val actual = restClient.queryWorkerMetrics(workerId, current = true)
-        actual.path shouldEqual s"worker${WorkerId.render(workerId)}"
-        actual.metrics.foreach(metric => {
-          metric.time should be > 0L
-          metric.value should not be null
-        })
-        val formerMetricsDump = actual.metrics.toString()
-
-        expectMetricsAvailable({
-          val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics
-          laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
-        }, "metrics available")
-      }
-    }
-  }
-
-  "configuration" should {
-    "retrieve the configuration of master and match particular values" in {
-      // exercise
-      val actual = restClient.queryMasterConfig()
-      actual.hasPath("gearpump") shouldBe true
-      actual.hasPath("gearpump.cluster") shouldBe true
-      actual.getString("gearpump.hostname") shouldEqual cluster.getMasterHosts.mkString(",")
-    }
-
-    "retrieve the configuration of worker X and match particular values" in {
-      // exercise
-      restClient.listRunningWorkers().foreach { worker =>
-        val actual = restClient.queryWorkerConfig(worker.workerId)
-        actual.hasPath("gearpump") shouldBe true
-        actual.hasPath("gearpump.worker") shouldBe true
-      }
-    }
-
-    "retrieve the configuration of executor X and match particular values" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      restClient.queryExecutorBrief(appId).foreach { executor =>
-        val executorId = executor.executorId
-        val actual = restClient.queryExecutorConfig(appId, executorId)
-        actual.hasPath("gearpump") shouldBe true
-        actual.hasPath("gearpump.executor") shouldBe true
-        actual.getInt("gearpump.applicationId") shouldEqual appId
-        actual.getInt("gearpump.executorId") shouldEqual executorId
-      }
-    }
-
-    "retrieve the configuration of application X and match particular values" in {
-      // setup
-      val appId = restClient.getNextAvailableAppId()
-
-      // exercise
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
-      success shouldBe true
-      val actual = restClient.queryAppMasterConfig(appId)
-      actual.hasPath("gearpump") shouldBe true
-      actual.hasPath("gearpump.appmaster") shouldBe true
-    }
-  }
-
-  "application life-cycle" should {
-    "newly started application should be configured same as the previous one, after restart" in {
-      // setup
-      val originSplitNum = 4
-      val originSumNum = 3
-      val originAppId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
-        s"-split $originSplitNum -sum $originSumNum")
-      success shouldBe true
-      expectAppIsRunning(originAppId, wordCountName)
-      val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
-
-      // exercise
-      Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted")
-      val killedApp = restClient.queryApp(originAppId)
-      killedApp.appId shouldEqual originAppId
-      killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
-      val runningApps = restClient.listRunningApps()
-      runningApps.length shouldEqual 1
-      val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId)
-      newAppDetail.appName shouldEqual originAppDetail.appName
-      newAppDetail.processors.size shouldEqual originAppDetail.processors.size
-      newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum
-      newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum
-    }
-  }
-
-  private def killAppAndVerify(appId: Int): Unit = {
-    val success = restClient.killApp(appId)
-    success shouldBe true
-
-    val actualApp = restClient.queryApp(appId)
-    actualApp.appId shouldEqual appId
-    actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
-  }
-
-  private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = {
-    val config = restClient.queryMasterConfig()
-    val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
-    Util.retryUntil(() => condition, conditionDescription, interval = reportInterval)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
deleted file mode 100644
index 4b15055..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import scala.concurrent.duration.Duration
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-/**
- * The test spec will perform destructive operations to check the stability. Operations
- * contains shutting-down appmaster, executor, or worker, and etc..
- */
-class StabilitySpec extends TestSpecBase {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  "kill appmaster" should {
-    "restart the whole application" in {
-      // setup
-      val appId = commandLineClient.submitApp(wordCountJar)
-      val formerAppMaster = restClient.queryApp(appId).appMasterPath
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-      ensureClockStoredInMaster()
-
-      // exercise
-      restClient.killAppMaster(appId) shouldBe true
-      // todo: how long master will begin to recover and how much time for the recovering?
-      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
-        "appmaster killed and restarted")
-
-      // verify
-      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
-      laterAppMaster.clock should be > 0L
-    }
-  }
-
-  "kill executor" should {
-    "will create a new executor and application will replay from the latest application clock" in {
-      // setup
-      val appId = commandLineClient.submitApp(wordCountJar)
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-      val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
-      ensureClockStoredInMaster()
-
-      // exercise
-      restClient.killExecutor(appId, executorToKill) shouldBe true
-      // todo: how long appmaster will begin to recover and how much time for the recovering?
-      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
-        .map(_.executorId).max > executorToKill,
-        s"executor $executorToKill killed and restarted")
-
-      // verify
-      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
-      laterAppMaster.clock should be > 0L
-    }
-  }
-
-  private def hostName(workerId: WorkerId): String = {
-    val worker = restClient.listRunningWorkers().find(_.workerId == workerId)
-    // Parse hostname from JVM info (in format: PID@hostname)
-    val hostname = worker.get.jvmName.split("@")(1)
-    hostname
-  }
-
-  "kill worker" should {
-    "worker will not recover but all its executors will be migrated to other workers" in {
-      // setup
-      restartClusterRequired = true
-      val appId = commandLineClient.submitApp(wordCountJar)
-      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-
-      val allexecutors = restClient.queryExecutorBrief(appId)
-      val maxExecutor = allexecutors.sortBy(_.executorId).last
-      ensureClockStoredInMaster()
-
-      val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
-      LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
-        s"worker: ${maxExecutor.workerId}")
-      val executorsSharingSameWorker = allexecutors
-        .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
-      LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," +
-        s" ${executorsSharingSameWorker.mkString(",")}")
-
-      // kill the worker and expect restarting all killed executors on other workers.
-      val workerIdToKill = maxExecutor.workerId
-      cluster.removeWorkerNode(hostName(workerIdToKill))
-
-      val appMasterKilled = executorsSharingSameWorker
-        .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
-      def executorsMigrated(): Boolean = {
-        val executors = restClient.queryExecutorBrief(appId)
-        val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
-        if (appMasterKilled) {
-          newAppMaster.get.workerId != appMaster.get.workerId
-        } else {
-          // New executors will be started to replace killed executors.
-          // The new executors will be assigned larger executor Id. We use this trick to detect
-          // Whether new executors have been started successfully.
-          executors.map(_.executorId).max > maxExecutor.executorId
-        }
-      }
-
-      Util.retryUntil(() => {
-        executorsMigrated()
-      }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed")
-
-      // verify
-      val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
-      laterAppMaster.clock should be > 0L
-    }
-  }
-
-  "kill master" should {
-    "master will be down and all workers will attempt to reconnect and suicide after X seconds" in {
-      // setup
-      restartClusterRequired = true
-      val masters = cluster.getMasterHosts
-      val config = restClient.queryMasterConfig()
-      val shutDownTimeout = Duration(config.getString("akka.cluster.auto-down-unreachable-after"))
-
-      // exercise
-      masters.foreach(cluster.removeMasterNode)
-      info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers are down")
-      Thread.sleep(shutDownTimeout.toMillis)
-
-      // verify
-      val aliveWorkers = cluster.getWorkerHosts
-      Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)),
-        "all workers down")
-    }
-  }
-
-  private def ensureClockStoredInMaster(): Unit = {
-    // TODO: 5000ms is a fixed sync period in clock service.
-    // we wait for 5000ms to assume the clock is stored
-    Thread.sleep(5000)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
deleted file mode 100644
index b327bf4..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader}
-import org.apache.gearpump.integrationtest.storm.StormClient
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks the compatibility of running Storm applications
- */
-class StormCompatibilitySpec extends TestSpecBase {
-
-  private lazy val stormClient = {
-    new StormClient(cluster, restClient)
-  }
-
-  val `version0.9` = "09"
-  val `version0.10` = "010"
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    stormClient.start()
-  }
-
-  override def afterAll(): Unit = {
-    stormClient.shutDown()
-    super.afterAll()
-  }
-
-  def withStorm(testCode: String => Unit): Unit = {
-    testCode(`version0.9`)
-    testCode(`version0.10`)
-  }
-
-  def getTopologyName(name: String, stormVersion: String): String = {
-    s"${name}_$stormVersion"
-  }
-
-  def getStormJar(stormVersion: String): String = {
-    cluster.queryBuiltInITJars(s"storm$stormVersion-").head
-  }
-
-  "Storm over Gearpump" should withStorm {
-    stormVersion =>
-      s"support basic topologies ($stormVersion)" in {
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("exclamation", stormVersion)
-
-        // exercise
-        val appId = stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.ExclamationTopology",
-          args = topologyName,
-          appName = topologyName)
-
-        // verify
-        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-      }
-
-      s"support to run a python version of wordcount ($stormVersion)" in {
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("wordcount", stormVersion)
-
-        // exercise
-        val appId = stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.WordCountTopology",
-          args = topologyName,
-          appName = topologyName)
-
-        // verify
-        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-      }
-
-      s"support DRPC ($stormVersion)" in {
-        // ReachTopology computes the Twitter url reached by users and their followers
-        // using Storm Distributed RPC feature
-        // input (user and follower) data are already prepared in memory
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("reach", stormVersion)
-        stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.ReachTopology",
-          args = topologyName,
-          appName = topologyName)
-        val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
-
-        // verify
-        Util.retryUntil(() => {
-          drpcClient.execute("reach", "notaurl.com") == "0"
-        }, "drpc reach == 0")
-        drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
-        drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14"
-      }
-
-      s"support tick tuple ($stormVersion)" in {
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
-
-        // exercise
-        val appId = stormClient.submitStormApp(
-          jar = stormJar,
-          mainClass = "storm.starter.RollingTopWords",
-          args = s"$topologyName remote",
-          appName = topologyName)
-
-        // verify
-        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-      }
-
-      s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
-
-        val stormJar = getStormJar(stormVersion)
-        val topologyName = getTopologyName("storm_kafka", stormVersion)
-        val stormKafkaTopology =
-          s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
-
-        import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
-        withKafkaCluster(cluster) {
-          kafkaCluster =>
-            val sourcePartitionNum = 2
-            val sinkPartitionNum = 1
-            val zookeeper = kafkaCluster.getZookeeperConnectString
-            val brokerList = kafkaCluster.getBrokerListConnectString
-            val sourceTopic = "topic1"
-            val sinkTopic = "topic2"
-
-            val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
-              "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
-              "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum"
-            )
-
-            kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-
-            // generate number sequence (1, 2, 3, ...) to the topic
-            withDataProducer(sourceTopic, brokerList) { producer =>
-
-              val appId = stormClient.submitStormApp(
-                jar = stormJar,
-                mainClass = stormKafkaTopology,
-                args = args.mkString(" "),
-                appName = topologyName)
-
-              Util.retryUntil(() =>
-                restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-
-              // kill executor and verify at-least-once is guaranteed on application restart
-              val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
-              restClient.killExecutor(appId, executorToKill) shouldBe true
-              Util.retryUntil(() =>
-                restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill,
-                s"executor $executorToKill killed")
-
-              // verify no message loss
-              val detector = new
-                  MessageLossDetector(producer.lastWriteNum)
-              val kafkaReader = new
-                  SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost,
-                    port = kafkaCluster.advertisedPort)
-
-              Util.retryUntil(() => {
-                kafkaReader.read()
-                detector.allReceived
-              }, "all kafka message read")
-            }
-        }
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
deleted file mode 100644
index 7942308..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.suites
-
-import org.scalatest._
-
-import org.apache.gearpump.integrationtest.MiniClusterProvider
-import org.apache.gearpump.integrationtest.checklist._
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-
-/**
- * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific
- * test spec, you need to comment out other lines.
- */
-class StandaloneModeSuite extends Suites(
-  new CommandLineSpec,
-  new RestServiceSpec,
-  new ExampleSpec,
-  new DynamicDagSpec,
-  new StormCompatibilitySpec,
-  new StabilitySpec,
-  new ConnectorKafkaSpec,
-  new MessageDeliverySpec
-) with BeforeAndAfterAll {
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    MiniClusterProvider.managed = true
-    MiniClusterProvider.set(new MiniCluster).start()
-  }
-
-  override def afterAll(): Unit = {
-    MiniClusterProvider.get.shutDown()
-    super.afterAll()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
new file mode 100644
index 0000000..4a161c7
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+
+/**
+ * Provides a min cluster of Gearpump, which contains one or more masters, and workers.
+ */
+object MiniClusterProvider {
+
+  private var instance = new MiniCluster
+
+  def get: MiniCluster = instance
+
+  def set(instance: MiniCluster): MiniCluster = {
+    this.instance = instance
+    instance
+  }
+
+  /**
+   * Indicates whether test suite should create particular cluster. In case of false, every
+   * test spec will be responsible for cluster creation.
+   */
+  var managed = false
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
new file mode 100644
index 0000000..dabcc71
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.scalatest._
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * The abstract test spec
+ */
+trait TestSpecBase
+  extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll {
+
+  private def LOGGER = LogUtil.getLogger(getClass)
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    if (!MiniClusterProvider.managed) {
+      LOGGER.info("Will test with a default standalone mini cluster")
+      MiniClusterProvider.get.start()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    if (!MiniClusterProvider.managed) {
+      LOGGER.info("Will shutdown the default mini cluster")
+      MiniClusterProvider.get.shutDown()
+    }
+    super.afterAll()
+  }
+
+  lazy val cluster = MiniClusterProvider.get
+  lazy val commandLineClient = cluster.commandLineClient
+  lazy val restClient = cluster.restClient
+
+  lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head
+  lazy val wordCountName = "wordCount"
+
+  var restartClusterRequired: Boolean = false
+
+  override def beforeEach(td: TestData): Unit = {
+
+    LOGGER.debug(s">### =============================================================")
+    LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
+
+    assert(cluster != null, "Configure MiniCluster properly in suite spec")
+    cluster.isAlive shouldBe true
+    restClient.listRunningApps().isEmpty shouldBe true
+    LOGGER.debug(s">### =============================================================")
+    LOGGER.debug(s">###2 Start test: ${td.name}\n")
+  }
+
+  override def afterEach(td: TestData): Unit = {
+    LOGGER.debug(s"<### =============================================================")
+    LOGGER.debug(s"<###3 End test: ${td.name}\n")
+
+    if (restartClusterRequired || !cluster.isAlive) {
+      restartClusterRequired = false
+      LOGGER.info("Will restart the cluster for next test case")
+      cluster.restart()
+    } else {
+      restClient.listRunningApps().foreach(app => {
+        commandLineClient.killApp(app.appId) shouldBe true
+      })
+    }
+  }
+
+  def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = {
+    val app = restClient.queryApp(appId)
+    app.status shouldEqual MasterToAppMaster.AppMasterActive
+    app.appName shouldEqual expectedAppName
+    app
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
new file mode 100644
index 0000000..eabc684
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.TestSpecBase
+
+/**
+ * The test spec checks the command-line usage
+ */
+class CommandLineSpec extends TestSpecBase {
+
+  "use `gear info` to list applications" should {
+    "retrieve 0 application after cluster just started" in {
+      // exercise
+      getRunningAppCount shouldEqual 0
+    }
+
+    "retrieve 1 application after the first application submission" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      expectAppIsRunningByParsingOutput(appId, wordCountName)
+
+      // exercise
+      getRunningAppCount shouldEqual 1
+    }
+  }
+
+  "use `gear app` to submit application" should {
+    "find a running application after submission" in {
+      // exercise
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      expectAppIsRunningByParsingOutput(appId, wordCountName)
+    }
+
+    "reject a repeated submission request while the application is running" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      expectAppIsRunningByParsingOutput(appId, wordCountName)
+
+      // exercise
+      val actualAppId = commandLineClient.submitApp(wordCountJar)
+      actualAppId shouldEqual -1
+    }
+
+    "reject an invalid submission (the jar file path is incorrect)" in {
+      // exercise
+      val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing")
+      actualAppId shouldEqual -1
+    }
+  }
+
+  "use `gear kill` to kill application" should {
+    "a running application should be killed" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+
+      // exercise
+      val success = commandLineClient.killApp(appId)
+      success shouldBe true
+    }
+
+    "should fail when attempting to kill a stopped application" in {
+      // setup
+      val appId = expectSubmitAppSuccess(wordCountJar)
+      var success = commandLineClient.killApp(appId)
+      success shouldBe true
+
+      // exercise
+      success = commandLineClient.killApp(appId)
+      success shouldBe false
+    }
+
+    "the EmbededCluster can be used as embedded cluster in process" in {
+      // setup
+      val args = "-debug true -sleep 10"
+      val appId = expectSubmitAppSuccess(wordCountJar, args)
+      var success = commandLineClient.killApp(appId)
+      success shouldBe true
+    }
+
+    "should fail when attempting to kill a non-exist application" in {
+      // setup
+      val freeAppId = getNextAvailableAppId
+
+      // exercise
+      val success = commandLineClient.killApp(freeAppId)
+      success shouldBe false
+    }
+  }
+
+  "use `gear replay` to replay the application from current min clock" should {
+    "todo: description" in {
+      // todo: test code
+    }
+  }
+
+  private def getRunningAppCount: Int = {
+    commandLineClient.listRunningApps().length
+  }
+
+  private def getNextAvailableAppId: Int = {
+    commandLineClient.listApps().length + 1
+  }
+
+  private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
+    val appId = commandLineClient.submitApp(jar)
+    appId should not equal -1
+    appId
+  }
+
+  private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = {
+    val actual = commandLineClient.queryApp(appId)
+    actual should include(s"application: $appId, ")
+    actual should include(s"name: $expectedName, ")
+    actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
+  }
+}


[2/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the integration test file structure

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
deleted file mode 100644
index 73413da..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.sys.process._
-
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A helper to instantiate the base image for different usage.
- */
-class BaseContainer(val host: String, command: String,
-    masterAddrs: List[(String, Int)],
-    tunnelPorts: Set[Int] = Set.empty) {
-
-  private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
-  private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
-  private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
-  private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
-  private val HOST_LOG_HOME = {
-    val dir = "/tmp/gearpump"
-    s"mkdir -p $dir".!!
-    s"mktemp -p $dir -d".!!.trim
-  }
-
-  private val CLUSTER_OPTS = {
-    masterAddrs.zipWithIndex.map { case (hostPort, index) =>
-      s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
-    }.mkString(" ")
-  }
-
-  def createAndStart(): String = {
-    Docker.createAndStartContainer(host, IMAGE_NAME, command,
-      environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
-      volumes = Map(
-        HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
-        HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
-      knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
-      tunnelPorts = tunnelPorts)
-  }
-
-  def killAndRemove(): Unit = {
-    Docker.killAndRemoveContainer(host)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
deleted file mode 100644
index 884a8d1..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A command-line client to operate a Gearpump cluster
- */
-class CommandLineClient(host: String) {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  def listApps(): Array[String] = {
-    gearCommand(host, "gear info").split("\n").filter(
-      _.startsWith("application: ")
-    )
-  }
-
-  def listRunningApps(): Array[String] =
-    listApps().filter(
-      _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
-    )
-
-  def queryApp(appId: Int): String = try {
-    listApps().filter(
-      _.startsWith(s"application: $appId")
-    ).head
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      ""
-  }
-
-  def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
-    gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
-  }
-
-  def submitApp(jar: String, args: String = ""): Int = {
-    LOG.debug(s"|=> Submit Application $jar...")
-    submitAppUse("gear app", jar, args)
-  }
-
-  private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
-    gearCommand(host, s"$launcher -jar $jar $args").split("\n")
-      .filter(_.contains("The application id is ")).head.split(" ").last.toInt
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      -1
-  }
-
-  def killApp(appId: Int): Boolean = {
-    tryGearCommand(host, s"gear kill -appid $appId")
-  }
-
-  private def gearCommand(container: String, command: String): String = {
-    LOG.debug(s"|=> Gear command $command in container $container...")
-    Docker.execute(container, s"/opt/start $command")
-  }
-
-  private def tryGearCommand(container: String, command: String): Boolean = {
-    LOG.debug(s"|=> Gear command $command in container $container...")
-    Docker.executeSilently(container, s"/opt/start $command")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
deleted file mode 100644
index 4d439e8..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import java.io.IOException
-import scala.collection.mutable.ListBuffer
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-/**
- * This class is a test driver for end-to-end integration test.
- */
-class MiniCluster {
-
-  private val LOG = Logger.getLogger(getClass)
-  private val SUT_HOME = "/opt/gearpump"
-
-  private val REST_SERVICE_PORT = 8090
-  private val MASTER_PORT = 3000
-  private val MASTER_ADDRS: List[(String, Int)] = {
-    (0 to 0).map(index =>
-      ("master" + index, MASTER_PORT)
-    ).toList
-  }
-
-  lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
-
-  lazy val restClient: RestClient = {
-    val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
-    client
-  }
-
-  private var workers: ListBuffer[String] = ListBuffer.empty
-
-  def start(workerNum: Int = 2): Unit = {
-
-    // Kill master
-    MASTER_ADDRS.foreach { case (host, _) =>
-      if (Docker.containerExists(host)) {
-        Docker.killAndRemoveContainer(host)
-      }
-    }
-
-    // Kill existing workers
-    workers ++= (0 until workerNum).map("worker" + _)
-    workers.foreach { worker =>
-      if (Docker.containerExists(worker)) {
-        Docker.killAndRemoveContainer(worker)
-      }
-    }
-
-    // Start Masters
-    MASTER_ADDRS.foreach({ case (host, port) =>
-      addMasterNode(host, port)
-    })
-
-    // Start Workers
-    workers.foreach { worker =>
-      val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
-      container.createAndStart()
-    }
-
-    // Check cluster status
-    expectRestClientAuthenticated()
-    expectClusterAvailable()
-  }
-
-  private def addMasterNode(host: String, port: Int): Unit = {
-    val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
-    container.createAndStart()
-  }
-
-  def addWorkerNode(host: String): Unit = {
-    if (workers.find(_ == host).isEmpty) {
-      val container = new BaseContainer(host, "worker", MASTER_ADDRS)
-      container.createAndStart()
-      workers += host
-    } else {
-      throw new IOException(s"Cannot add new worker $host, " +
-        s"as worker with same hostname already exists")
-    }
-  }
-
-  /**
-   * @throws RuntimeException if rest client is not authenticated after N attempts
-   */
-  private def expectRestClientAuthenticated(): Unit = {
-    Util.retryUntil(() => {
-      restClient.login()
-      LOG.info("rest client has been authenticated")
-      true
-    }, "login successfully")
-  }
-
-  /**
-   * @throws RuntimeException if service is not available after N attempts
-   */
-  private def expectClusterAvailable(): Unit = {
-    Util.retryUntil(() => {
-      val response = restClient.queryMaster()
-      LOG.info(s"cluster is now available with response: $response.")
-      response.aliveFor > 0
-    }, "cluster running")
-  }
-
-  def isAlive: Boolean = {
-    getMasterHosts.exists(nodeIsOnline)
-  }
-
-  def getNetworkGateway: String = {
-    Docker.getNetworkGateway(MASTER_ADDRS.head._1)
-  }
-
-  def shutDown(): Unit = {
-    val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
-      .filter(nodeIsOnline).toArray
-    if (removalHosts.length > 0) {
-      Docker.killAndRemoveContainer(removalHosts)
-    }
-    workers.clear()
-  }
-
-  def removeMasterNode(host: String): Unit = {
-    Docker.killAndRemoveContainer(host)
-  }
-
-  def removeWorkerNode(host: String): Unit = {
-    workers -= host
-    Docker.killAndRemoveContainer(host)
-  }
-
-  def restart(): Unit = {
-    shutDown()
-    Util.retryUntil(() => {
-      !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
-    }, "all docker containers killed")
-    LOG.info("all docker containers have been killed. restarting...")
-    start()
-  }
-
-  def getMastersAddresses: List[(String, Int)] = {
-    MASTER_ADDRS
-  }
-
-  def getMasterHosts: List[String] = {
-    MASTER_ADDRS.map({ case (host, port) => host })
-  }
-
-  def getWorkerHosts: List[String] = {
-    workers.toList
-  }
-
-  def nodeIsOnline(host: String): Boolean = {
-    Docker.containerIsRunning(host)
-  }
-
-  private def builtInJarsUnder(folder: String): Array[String] = {
-    Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
-  }
-
-  private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
-    builtInJarsUnder(folder).filter(_.contains(subtext))
-  }
-
-  def queryBuiltInExampleJars(subtext: String): Seq[String] = {
-    queryBuiltInJars("examples", subtext)
-  }
-
-  def queryBuiltInITJars(subtext: String): Seq[String] = {
-    queryBuiltInJars("integrationtest", subtext)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
deleted file mode 100644
index 1b143af..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.reflect.ClassTag
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.log4j.Logger
-import upickle.Js
-import upickle.default._
-
-import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
-import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
-import org.apache.gearpump.cluster.master.MasterSummary
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-import org.apache.gearpump.services.AppMasterService.Status
-import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
-// NOTE: This cannot be removed!!!
-import org.apache.gearpump.services.util.UpickleUtil._
-import org.apache.gearpump.streaming.ProcessorDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
-import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
-import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
-import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
-import org.apache.gearpump.util.{Constants, Graph}
-
-/**
- * A REST client to operate a Gearpump cluster
- */
-class RestClient(host: String, port: Int) {
-
-  private val LOG = Logger.getLogger(getClass)
-
-  private val cookieFile: String = "cookie.txt"
-
-  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
-    upickle.default.Reader[Graph[Int, String]] {
-    case Js.Obj(verties, edges) =>
-      val vertexList = upickle.default.readJs[List[Int]](verties._2)
-      val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
-      Graph(vertexList, edgeList)
-  }
-
-  private def decodeAs[T](
-      expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
-    read[T](expr)
-  } catch {
-    case ex: Throwable =>
-      LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
-      throw ex
-  }
-
-  def queryVersion(): String = {
-    curl("version")
-  }
-
-  def listWorkers(): Array[WorkerSummary] = {
-    val resp = callApi("master/workerlist")
-    decodeAs[List[WorkerSummary]](resp).toArray
-  }
-
-  def listRunningWorkers(): Array[WorkerSummary] = {
-    listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
-  }
-
-  def listApps(): Array[AppMasterData] = {
-    val resp = callApi("master/applist")
-    decodeAs[AppMastersData](resp).appMasters.toArray
-  }
-
-  def listRunningApps(): Array[AppMasterData] = {
-    listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
-  }
-
-  def getNextAvailableAppId(): Int = {
-    listApps().length + 1
-  }
-
-  def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
-    : Boolean = try {
-    var endpoint = "master/submitapp"
-
-    var options = Seq(s"jar=@$jar")
-    if (config.length > 0) {
-      options :+= s"conf=@$config"
-    }
-
-    options :+= s"executorcount=$executorNum"
-
-    if (args != null && !args.isEmpty) {
-      options :+= "args=\"" + args + "\""
-    }
-
-    val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
-    val result = decodeAs[AppSubmissionResult](resp)
-    assert(result.success)
-    true
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def queryApp(appId: Int): AppMasterData = {
-    val resp = callApi(s"appmaster/$appId")
-    decodeAs[AppMasterData](resp)
-  }
-
-  def queryAppMasterConfig(appId: Int): Config = {
-    val resp = callApi(s"appmaster/$appId/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
-    val resp = callApi(s"appmaster/$appId?detail=true")
-    decodeAs[StreamAppMasterSummary](resp)
-  }
-
-  def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
-    : HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
-    val resp = callApi(s"appmaster/$appId/executor/$executorId")
-    decodeAs[ExecutorSummary](resp)
-  }
-
-  def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
-    queryStreamingAppDetail(appId).executors.toArray
-  }
-
-  def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryExecutorConfig(appId: Int, executorId: Int): Config = {
-    val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryMaster(): MasterSummary = {
-    val resp = callApi("master")
-    decodeAs[MasterData](resp).masterDescription
-  }
-
-  def queryMasterMetrics(current: Boolean): HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val resp = callApi(s"master/metrics/master?$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryMasterConfig(): Config = {
-    val resp = callApi("master/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
-    val args = if (current) "?readLatest=true" else ""
-    val workerIdStr = WorkerId.render(workerId)
-    val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
-    decodeAs[HistoryMetrics](resp)
-  }
-
-  def queryWorkerConfig(workerId: WorkerId): Config = {
-    val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
-    ConfigFactory.parseString(resp)
-  }
-
-  def queryBuiltInPartitioners(): Array[String] = {
-    val resp = callApi("master/partitioners")
-    decodeAs[BuiltinPartitioners](resp).partitioners
-  }
-
-  def uploadJar(localFilePath: String): AppJar = {
-    val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
-    decodeAs[AppJar](resp)
-  }
-
-  def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
-    replaceStreamingAppProcessor(appId, replaceMe, false)
-  }
-
-  def replaceStreamingAppProcessor(
-      appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
-    val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
-    val args = upickle.default.write(replaceOperation)
-    val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
-      CRUD_POST)
-    decodeAs[DAGOperationResult](resp)
-    true
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def killAppMaster(appId: Int): Boolean = {
-    killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-  }
-
-  def killExecutor(appId: Int, executorId: Int): Boolean = try {
-    val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
-    val pid = jvmInfo(0).toInt
-    val hostname = jvmInfo(1)
-    Docker.killProcess(hostname, pid)
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def killApp(appId: Int): Boolean = try {
-    val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
-    resp.contains("\"status\":\"success\"")
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  def restartApp(appId: Int): Boolean = try {
-    val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
-    decodeAs[Status](resp).success
-  } catch {
-    case ex: Throwable =>
-      LOG.warn(s"swallowed an exception: $ex")
-      false
-  }
-
-  private val CRUD_POST = "-X POST"
-  private val CRUD_DELETE = "-X DELETE"
-
-  private def callApi(endpoint: String, option: String = ""): String = {
-    curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
-  }
-
-  private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
-    Docker.curl(host, s"http://$host:$port/$endpoint", options)
-  }
-
-  def login(): Unit = {
-    curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
-      "--data username=admin", "--data password=admin"))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
deleted file mode 100644
index 79adfc4..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import scala.util.Random
-
-import backtype.storm.utils.{DRPCClient, Utils}
-
-import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-class StormClient(cluster: MiniCluster, restClient: RestClient) {
-
-  private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
-  private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
-  private val DRPC_HOST = "storm0"
-  private val DRPC_PORT = 3772
-  private val DRPC_INVOCATIONS_PORT = 3773
-  private val STORM_DRPC = "storm-drpc"
-  private val NIMBUS_HOST = "storm1"
-  private val STORM_NIMBUS = "storm nimbus"
-  private val STORM_APP = "/opt/start storm app"
-
-  private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
-    tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
-
-  private val nimbusContainer =
-    new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
-
-  def start(): Unit = {
-    nimbusContainer.createAndStart()
-    ensureNimbusRunning()
-
-    drpcContainer.createAndStart()
-    ensureDrpcServerRunning()
-  }
-
-  private def ensureNimbusRunning(): Unit = {
-    Util.retryUntil(() => {
-      val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
-      // Parse format nimbus.thrift.port: '39322'
-      val thriftPort = response.split(" ")(1).replace("'", "").toInt
-
-      Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
-    }, "Nimbus running")
-  }
-
-  private def ensureDrpcServerRunning(): Unit = {
-    Util.retryUntil(() => {
-      Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
-    }, "DRPC running")
-  }
-
-  def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
-    Util.retryUntil(() => {
-      Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
-        s"-jar $jar $mainClass $args")
-      restClient.listRunningApps().exists(_.appName == appName)
-    }, "app running")
-    restClient.listRunningApps().filter(_.appName == appName).head.appId
-  }
-
-  def getDRPCClient(drpcServerIp: String): DRPCClient = {
-    val config = Utils.readDefaultConfig()
-    new DRPCClient(config, drpcServerIp, DRPC_PORT)
-  }
-
-  def shutDown(): Unit = {
-
-    // Cleans up the storm.yaml config file
-    Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
-    drpcContainer.killAndRemove()
-    nimbusContainer.killAndRemove()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
new file mode 100644
index 0000000..f315ad3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.log4j.Logger
+
+/**
+ * The class is used to execute Docker commands.
+ */
+object Docker {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  /**
+   * @throws RuntimeException in case retval != 0
+   */
+  private def doExecute(container: String, command: String): String = {
+    ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container")
+  }
+
+  private def doExecuteSilently(container: String, command: String): Boolean = {
+    ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
+  }
+
+  /**
+   * @throws RuntimeException in case retval != 0
+   */
+  final def execute(container: String, command: String): String = {
+    trace(container, s"Execute $command") {
+      doExecute(container, command)
+    }
+  }
+
+  final def executeSilently(container: String, command: String): Boolean = {
+    trace(container, s"Execute silently $command") {
+      doExecuteSilently(container, command)
+    }
+  }
+
+  final def listContainers(): Seq[String] = {
+    trace("", s"Listing how many containers...") {
+      ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
+        .split("\n").filter(_.nonEmpty)
+    }
+  }
+
+  final def containerIsRunning(name: String): Boolean = {
+    trace(name, s"Check container running or not...") {
+      ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty
+    }
+  }
+
+  final def getContainerIPAddr(name: String): String = {
+    trace(name, s"Get Ip Address") {
+      Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
+    }
+  }
+
+  final def containerExists(name: String): Boolean = {
+    trace(name, s"Check container existing or not...") {
+      ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty
+    }
+  }
+
+  /**
+   * @throws RuntimeException in case particular container is created already
+   */
+  final def createAndStartContainer(name: String, image: String, command: String,
+      environ: Map[String, String] = Map.empty, // key, value
+      volumes: Map[String, String] = Map.empty, // from, to
+      knownHosts: Set[String] = Set.empty,
+      tunnelPorts: Set[Int] = Set.empty): String = {
+
+    if (containerExists(name)) {
+      killAndRemoveContainer(name)
+    }
+
+    trace(name, s"Create and start $name ($image)...") {
+
+      val optsBuilder = new StringBuilder
+      optsBuilder.append("-d") // run in background
+      optsBuilder.append(" -h " + name) // use container name as hostname
+      optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings
+
+      environ.foreach { case (key, value) =>
+        optsBuilder.append(s" -e $key=$value")
+      }
+      volumes.foreach { case (from, to) =>
+        optsBuilder.append(s" -v $from:$to")
+      }
+      knownHosts.foreach(host =>
+        optsBuilder.append(" --link " + host)
+      )
+      tunnelPorts.foreach(port =>
+        optsBuilder.append(s" -p $port:$port")
+      )
+      createAndStartContainer(name, optsBuilder.toString(), command, image)
+    }
+  }
+
+  /**
+   * @throws RuntimeException in case particular container is created already
+   */
+  private def createAndStartContainer(
+      name: String, options: String, command: String, image: String): String = {
+    ShellExec.execAndCaptureOutput(s"docker run $options " +
+      s"--name $name $image $command", s"MAKE $name")
+  }
+
+  final def killAndRemoveContainer(name: String): Boolean = {
+    trace(name, s"kill and remove container") {
+      ShellExec.exec(s"docker rm -f $name", s"STOP $name")
+    }
+  }
+
+  final def killAndRemoveContainer(names: Array[String]): Boolean = {
+    assert(names.length > 0)
+    val args = names.mkString(" ")
+    trace(names.mkString(","), s"kill and remove containers") {
+      ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
+    }
+  }
+
+  private def inspect(container: String, option: String): String = {
+    ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container")
+  }
+
+  final def curl(container: String, url: String, options: Array[String] = Array.empty[String])
+    : String = {
+    trace(container, s"curl $url") {
+      doExecute(container, s"curl -s ${options.mkString(" ")} $url")
+    }
+  }
+
+  final def getHostName(container: String): String = {
+    trace(container, s"Get hostname of container...") {
+      doExecute(container, "hostname")
+    }
+  }
+
+  final def getNetworkGateway(container: String): String = {
+    trace(container, s"Get gateway of container...") {
+      doExecute(container, "ip route").split("\\s+")(2)
+    }
+  }
+  final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = {
+    trace(container, s"Kill process pid: $pid") {
+      doExecuteSilently(container, s"kill -$signal $pid")
+    }
+  }
+
+  final def findJars(container: String, folder: String): Array[String] = {
+    trace(container, s"Find jars under $folder") {
+      doExecute(container, s"find $folder")
+        .split("\n").filter(_.endsWith(".jar"))
+    }
+  }
+
+  private def trace[T](container: String, msg: String)(fun: => T): T = {
+    // scalastyle:off println
+    Console.println() // A empty line to let the output looks better.
+    // scalastyle:on println
+    LOG.debug(s"Container $container====>> $msg")
+    LOG.debug("INPUT==>>")
+    val response = fun
+    LOG.debug("<<==OUTPUT")
+
+    LOG.debug(brief(response))
+
+    LOG.debug(s"<<====Command END. Container $container, $msg \n")
+    response
+  }
+
+  private val PREVIEW_MAX_LENGTH = 1024
+
+  private def brief[T](input: T): String = {
+    val output = input match {
+      case true =>
+        "Success|True"
+      case false =>
+        "Failure|False"
+      case x: Array[Any] =>
+        "Success: [" + x.mkString(",") + "]"
+      case x =>
+        x.toString
+    }
+
+    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+    }
+    else {
+      output
+    }
+    preview
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
new file mode 100644
index 0000000..25d7ee3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.sys.process._
+
+import org.apache.log4j.Logger
+import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
+
+/**
+ * The class is used to execute command in a shell
+ */
+object ShellExec {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val PROCESS_TIMEOUT = 2.minutes
+
+  /**
+   * The builtin command line parser by ProcessBuilder (implicit sys.process) don't
+   * respect the quote chars (' and ")
+   */
+  private def splitQuotedString(str: String): List[String] = {
+    val splitter = new QuotedStringTokenizer(str, " \t\n\r")
+    splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
+  }
+
+  def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = {
+    LOG.debug(s"$sender => `$command`")
+
+    val p = splitQuotedString(command).run()
+    val f = Future(blocking(p.exitValue())) // wrap in Future
+    val retval = {
+      try {
+        Await.result(f, timeout)
+      } catch {
+        case _: TimeoutException =>
+          LOG.error(s"timeout to execute command `$command`")
+          p.destroy()
+          p.exitValue()
+      }
+    }
+    LOG.debug(s"$sender <= exit $retval")
+    retval == 0
+  }
+
+  def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT)
+    : String = {
+    LOG.debug(s"$sender => `$command`")
+
+    val buf = new StringBuilder
+    val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"),
+      (e: String) => buf.append(e).append("\n"))
+    val p = splitQuotedString(command).run(processLogger)
+    val f = Future(blocking(p.exitValue())) // wrap in Future
+    val retval = {
+      try {
+        Await.result(f, timeout)
+      } catch {
+        case _: TimeoutException =>
+          p.destroy()
+          p.exitValue()
+      }
+    }
+    val output = buf.toString().trim
+    val PREVIEW_MAX_LENGTH = 200
+    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+    } else {
+      output
+    }
+
+    LOG.debug(s"$sender <= `$preview` exit $retval")
+    if (retval != 0) {
+      throw new RuntimeException(
+        s"exited ($retval) by executing `$command`")
+    }
+    output
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
new file mode 100644
index 0000000..7e7085d
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.log4j.Logger
+
+object Util {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  def encodeUriComponent(s: String): String = {
+    try {
+      java.net.URLEncoder.encode(s, "UTF-8")
+        .replaceAll("\\+", "%20")
+        .replaceAll("\\%21", "!")
+        .replaceAll("\\%27", "'")
+        .replaceAll("\\%28", "(")
+        .replaceAll("\\%29", ")")
+        .replaceAll("\\%7E", "~")
+    } catch {
+      case ex: Throwable => s
+    }
+  }
+
+  def retryUntil(
+      condition: () => Boolean, conditionDescription: String, maxTries: Int = 15,
+      interval: Duration = 10.seconds): Unit = {
+    var met = false
+    var tries = 0
+
+    while (!met && tries < maxTries) {
+
+      met = Try(condition()) match {
+        case Success(true) => true
+        case Success(false) => false
+        case Failure(ex) => false
+      }
+
+      tries += 1
+
+      if (!met) {
+        LOG.error(s"Failed due to (false == $conditionDescription), " +
+          s"retrying for the ${tries} times...")
+        Thread.sleep(interval.toMillis)
+      } else {
+        LOG.info(s"Success ($conditionDescription) after ${tries} retries")
+      }
+    }
+
+    if (!met) {
+      throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
new file mode 100644
index 0000000..f836abd
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.hadoop
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object HadoopCluster {
+
+  /** Starts a Hadoop cluster */
+  def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
+    val hadoopCluster = new HadoopCluster
+    try {
+      hadoopCluster.start()
+      testCode(hadoopCluster)
+    } finally {
+      hadoopCluster.shutDown()
+    }
+  }
+}
+/**
+ * This class maintains a single node Hadoop cluster
+ */
+class HadoopCluster {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
+  private val HADOOP_HOST = "hadoop0"
+
+  def start(): Unit = {
+    Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
+
+    Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
+    LOG.info("Hadoop cluster is started.")
+  }
+
+  // Checks whether the cluster is alive by listing "/"
+  private def isAlive: Boolean = {
+    Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /")
+  }
+
+  def getDefaultFS: String = {
+    val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
+    s"hdfs://$hostIPAddr:9000"
+  }
+
+  def shutDown(): Unit = {
+    Docker.killAndRemoveContainer(HADOOP_HOST)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
new file mode 100644
index 0000000..15ba084
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object KafkaCluster {
+
+  /** Starts a Kafka cluster */
+  def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
+    val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
+    try {
+      kafkaCluster.start()
+      testCode(kafkaCluster)
+    } finally {
+      kafkaCluster.shutDown()
+    }
+  }
+
+  def withDataProducer(topic: String, brokerList: String)
+    (testCode: NumericalDataProducer => Unit): Unit = {
+    val producer = new NumericalDataProducer(topic, brokerList)
+    try {
+      producer.start()
+      testCode(producer)
+    } finally {
+      producer.stop()
+    }
+  }
+}
+
+/**
+ * This class maintains a single node Kafka cluster with integrated Zookeeper.
+ */
+class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
+  private val KAFKA_HOST = "kafka0"
+  private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
+  private val ZOOKEEPER_PORT = 2181
+  private val BROKER_PORT = 9092
+  val advertisedPort = BROKER_PORT
+
+  def start(): Unit = {
+    Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
+      environ = Map(
+        "ADVERTISED_HOST" -> advertisedHost,
+        "ADVERTISED_PORT" -> BROKER_PORT.toString,
+        "ZK_CHROOT" -> zkChroot),
+      tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
+    )
+    Util.retryUntil(() => isAlive, "kafka cluster is alive")
+    LOG.debug("kafka cluster is started.")
+  }
+
+  def isAlive: Boolean = {
+    !listTopics().contains("Connection refused")
+  }
+
+  def shutDown(): Unit = {
+    Docker.killAndRemoveContainer(KAFKA_HOST)
+  }
+
+  private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
+
+  def listTopics(): String = {
+    kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
+  }
+
+  def getZookeeperConnectString: String = {
+    s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
+  }
+
+  def getBrokerListConnectString: String = {
+    s"$hostIPAddr:$BROKER_PORT"
+  }
+
+  def createTopic(topic: String, partitions: Int = 1): Unit = {
+    LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
+
+    Docker.executeSilently(KAFKA_HOST,
+      s"$KAFKA_HOME/bin/kafka-topics.sh" +
+        s" --zookeeper $getZookeeperConnectString" +
+        s" --create --topic $topic --partitions $partitions --replication-factor 1")
+  }
+
+  def produceDataToKafka(topic: String, messageNum: Int): Unit = {
+    Docker.executeSilently(KAFKA_HOST,
+      s"$KAFKA_HOME/bin/kafka-topics.sh" +
+        s" --zookeeper $getZookeeperConnectString" +
+        s" --create --topic $topic --partitions 1 --replication-factor 1")
+
+    Docker.executeSilently(KAFKA_HOST,
+      s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
+        s" --broker-list $getBrokerListConnectString" +
+        s" --topic $topic --messages $messageNum")
+  }
+
+  def getLatestOffset(topic: String): Int = {
+    kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
+  }
+
+  private def kafkaListTopics(
+      container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
+
+    LOG.debug(s"|=> Kafka list topics...")
+    Docker.execute(container,
+      s"$kafkaHome/bin/kafka-topics.sh" +
+        s" --zookeeper $zookeeperConnectionString -list")
+  }
+
+  private def kafkaFetchLatestOffset(
+      container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
+    LOG.debug(s"|=> Get latest offset of topic $topic...")
+    val output = Docker.execute(container,
+      s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
+        s" --broker-list $brokersList " +
+        s" --topic $topic --time -1")
+    output.split(":")(2).toInt
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
new file mode 100644
index 0000000..1cf3125
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class NumericalDataProducer(topic: String, bootstrapServers: String) {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val producer = createProducer
+  private val WRITE_SLEEP_NANOS = 10
+  private val serializer = new ChillSerializer[Int]
+  var lastWriteNum = 0
+
+  def start(): Unit = {
+    produceThread.start()
+  }
+
+  def stop(): Unit = {
+    if (produceThread.isAlive) {
+      produceThread.interrupt()
+      produceThread.join()
+    }
+    producer.close()
+  }
+
+  /** How many message we have written in total */
+  def producedNumbers: Range = {
+    Range(1, lastWriteNum + 1)
+  }
+
+  private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", bootstrapServers)
+    new KafkaProducer[Array[Byte], Array[Byte]](properties,
+      new ByteArraySerializer, new ByteArraySerializer)
+  }
+
+  private val produceThread = new Thread(new Runnable {
+    override def run(): Unit = {
+      try {
+        while (!Thread.currentThread.isInterrupted) {
+          lastWriteNum += 1
+          val msg = serializer.serialize(lastWriteNum)
+          val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
+          producer.send(record)
+          Thread.sleep(0, WRITE_SLEEP_NANOS)
+        }
+      } catch {
+        case ex: InterruptedException =>
+          LOG.error("message producing is stopped by an interrupt")
+      }
+    }
+  })
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
new file mode 100644
index 0000000..1f773d3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.collection.mutable
+
+trait ResultVerifier {
+  def onNext(num: Int): Unit
+}
+
+class MessageLossDetector(totalNum: Int) extends ResultVerifier {
+  private val bitSets = new mutable.BitSet(totalNum)
+  var result = List.empty[Int]
+
+  override def onNext(num: Int): Unit = {
+    bitSets.add(num)
+    result :+= num
+  }
+
+  def allReceived: Boolean = {
+    1.to(totalNum).forall(bitSets)
+  }
+
+  def received(num: Int): Boolean = {
+    bitSets(num)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
new file mode 100644
index 0000000..392ca86
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.util.{Failure, Success}
+
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
+import kafka.utils.Utils
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0,
+    host: String, port: Int) {
+
+  private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
+  private val serializer = new ChillSerializer[Int]
+  private var offset = 0L
+
+  def read(): Unit = {
+    val messageSet = consumer.fetch(
+      new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build()
+    ).messageSet(topic, partition)
+
+    for (messageAndOffset <- messageSet) {
+      serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match {
+        case Success(msg) =>
+          offset = messageAndOffset.nextOffset
+          verifier.onNext(msg)
+        case Failure(e) => throw e
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
new file mode 100644
index 0000000..73413da
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.sys.process._
+
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A helper to instantiate the base image for different usage.
+ */
+class BaseContainer(val host: String, command: String,
+    masterAddrs: List[(String, Int)],
+    tunnelPorts: Set[Int] = Set.empty) {
+
+  private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
+  private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
+  private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
+  private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
+  private val HOST_LOG_HOME = {
+    val dir = "/tmp/gearpump"
+    s"mkdir -p $dir".!!
+    s"mktemp -p $dir -d".!!.trim
+  }
+
+  private val CLUSTER_OPTS = {
+    masterAddrs.zipWithIndex.map { case (hostPort, index) =>
+      s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
+    }.mkString(" ")
+  }
+
+  def createAndStart(): String = {
+    Docker.createAndStartContainer(host, IMAGE_NAME, command,
+      environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
+      volumes = Map(
+        HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
+        HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
+      knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
+      tunnelPorts = tunnelPorts)
+  }
+
+  def killAndRemove(): Unit = {
+    Docker.killAndRemoveContainer(host)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
new file mode 100644
index 0000000..884a8d1
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A command-line client to operate a Gearpump cluster
+ */
+class CommandLineClient(host: String) {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  def listApps(): Array[String] = {
+    gearCommand(host, "gear info").split("\n").filter(
+      _.startsWith("application: ")
+    )
+  }
+
+  def listRunningApps(): Array[String] =
+    listApps().filter(
+      _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
+    )
+
+  def queryApp(appId: Int): String = try {
+    listApps().filter(
+      _.startsWith(s"application: $appId")
+    ).head
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      ""
+  }
+
+  def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
+    gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
+  }
+
+  def submitApp(jar: String, args: String = ""): Int = {
+    LOG.debug(s"|=> Submit Application $jar...")
+    submitAppUse("gear app", jar, args)
+  }
+
+  private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
+    gearCommand(host, s"$launcher -jar $jar $args").split("\n")
+      .filter(_.contains("The application id is ")).head.split(" ").last.toInt
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      -1
+  }
+
+  def killApp(appId: Int): Boolean = {
+    tryGearCommand(host, s"gear kill -appid $appId")
+  }
+
+  private def gearCommand(container: String, command: String): String = {
+    LOG.debug(s"|=> Gear command $command in container $container...")
+    Docker.execute(container, s"/opt/start $command")
+  }
+
+  private def tryGearCommand(container: String, command: String): Boolean = {
+    LOG.debug(s"|=> Gear command $command in container $container...")
+    Docker.executeSilently(container, s"/opt/start $command")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
new file mode 100644
index 0000000..4d439e8
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import java.io.IOException
+import scala.collection.mutable.ListBuffer
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+/**
+ * This class is a test driver for end-to-end integration test.
+ */
+class MiniCluster {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val SUT_HOME = "/opt/gearpump"
+
+  private val REST_SERVICE_PORT = 8090
+  private val MASTER_PORT = 3000
+  private val MASTER_ADDRS: List[(String, Int)] = {
+    (0 to 0).map(index =>
+      ("master" + index, MASTER_PORT)
+    ).toList
+  }
+
+  lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
+
+  lazy val restClient: RestClient = {
+    val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
+    client
+  }
+
+  private var workers: ListBuffer[String] = ListBuffer.empty
+
+  def start(workerNum: Int = 2): Unit = {
+
+    // Kill master
+    MASTER_ADDRS.foreach { case (host, _) =>
+      if (Docker.containerExists(host)) {
+        Docker.killAndRemoveContainer(host)
+      }
+    }
+
+    // Kill existing workers
+    workers ++= (0 until workerNum).map("worker" + _)
+    workers.foreach { worker =>
+      if (Docker.containerExists(worker)) {
+        Docker.killAndRemoveContainer(worker)
+      }
+    }
+
+    // Start Masters
+    MASTER_ADDRS.foreach({ case (host, port) =>
+      addMasterNode(host, port)
+    })
+
+    // Start Workers
+    workers.foreach { worker =>
+      val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
+      container.createAndStart()
+    }
+
+    // Check cluster status
+    expectRestClientAuthenticated()
+    expectClusterAvailable()
+  }
+
+  private def addMasterNode(host: String, port: Int): Unit = {
+    val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
+    container.createAndStart()
+  }
+
+  def addWorkerNode(host: String): Unit = {
+    if (workers.find(_ == host).isEmpty) {
+      val container = new BaseContainer(host, "worker", MASTER_ADDRS)
+      container.createAndStart()
+      workers += host
+    } else {
+      throw new IOException(s"Cannot add new worker $host, " +
+        s"as worker with same hostname already exists")
+    }
+  }
+
+  /**
+   * @throws RuntimeException if rest client is not authenticated after N attempts
+   */
+  private def expectRestClientAuthenticated(): Unit = {
+    Util.retryUntil(() => {
+      restClient.login()
+      LOG.info("rest client has been authenticated")
+      true
+    }, "login successfully")
+  }
+
+  /**
+   * @throws RuntimeException if service is not available after N attempts
+   */
+  private def expectClusterAvailable(): Unit = {
+    Util.retryUntil(() => {
+      val response = restClient.queryMaster()
+      LOG.info(s"cluster is now available with response: $response.")
+      response.aliveFor > 0
+    }, "cluster running")
+  }
+
+  def isAlive: Boolean = {
+    getMasterHosts.exists(nodeIsOnline)
+  }
+
+  def getNetworkGateway: String = {
+    Docker.getNetworkGateway(MASTER_ADDRS.head._1)
+  }
+
+  def shutDown(): Unit = {
+    val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
+      .filter(nodeIsOnline).toArray
+    if (removalHosts.length > 0) {
+      Docker.killAndRemoveContainer(removalHosts)
+    }
+    workers.clear()
+  }
+
+  def removeMasterNode(host: String): Unit = {
+    Docker.killAndRemoveContainer(host)
+  }
+
+  def removeWorkerNode(host: String): Unit = {
+    workers -= host
+    Docker.killAndRemoveContainer(host)
+  }
+
+  def restart(): Unit = {
+    shutDown()
+    Util.retryUntil(() => {
+      !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
+    }, "all docker containers killed")
+    LOG.info("all docker containers have been killed. restarting...")
+    start()
+  }
+
+  def getMastersAddresses: List[(String, Int)] = {
+    MASTER_ADDRS
+  }
+
+  def getMasterHosts: List[String] = {
+    MASTER_ADDRS.map({ case (host, port) => host })
+  }
+
+  def getWorkerHosts: List[String] = {
+    workers.toList
+  }
+
+  def nodeIsOnline(host: String): Boolean = {
+    Docker.containerIsRunning(host)
+  }
+
+  private def builtInJarsUnder(folder: String): Array[String] = {
+    Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
+  }
+
+  private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
+    builtInJarsUnder(folder).filter(_.contains(subtext))
+  }
+
+  def queryBuiltInExampleJars(subtext: String): Seq[String] = {
+    queryBuiltInJars("examples", subtext)
+  }
+
+  def queryBuiltInITJars(subtext: String): Seq[String] = {
+    queryBuiltInJars("integrationtest", subtext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
new file mode 100644
index 0000000..1b143af
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.reflect.ClassTag
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.log4j.Logger
+import upickle.Js
+import upickle.default._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
+import org.apache.gearpump.cluster.master.MasterSummary
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+import org.apache.gearpump.services.AppMasterService.Status
+import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
+import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
+import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
+import org.apache.gearpump.util.{Constants, Graph}
+
+/**
+ * A REST client to operate a Gearpump cluster
+ */
+class RestClient(host: String, port: Int) {
+
+  private val LOG = Logger.getLogger(getClass)
+
+  private val cookieFile: String = "cookie.txt"
+
+  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
+    upickle.default.Reader[Graph[Int, String]] {
+    case Js.Obj(verties, edges) =>
+      val vertexList = upickle.default.readJs[List[Int]](verties._2)
+      val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
+      Graph(vertexList, edgeList)
+  }
+
+  private def decodeAs[T](
+      expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
+    read[T](expr)
+  } catch {
+    case ex: Throwable =>
+      LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
+      throw ex
+  }
+
+  def queryVersion(): String = {
+    curl("version")
+  }
+
+  def listWorkers(): Array[WorkerSummary] = {
+    val resp = callApi("master/workerlist")
+    decodeAs[List[WorkerSummary]](resp).toArray
+  }
+
+  def listRunningWorkers(): Array[WorkerSummary] = {
+    listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
+  }
+
+  def listApps(): Array[AppMasterData] = {
+    val resp = callApi("master/applist")
+    decodeAs[AppMastersData](resp).appMasters.toArray
+  }
+
+  def listRunningApps(): Array[AppMasterData] = {
+    listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
+  }
+
+  def getNextAvailableAppId(): Int = {
+    listApps().length + 1
+  }
+
+  def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
+    : Boolean = try {
+    var endpoint = "master/submitapp"
+
+    var options = Seq(s"jar=@$jar")
+    if (config.length > 0) {
+      options :+= s"conf=@$config"
+    }
+
+    options :+= s"executorcount=$executorNum"
+
+    if (args != null && !args.isEmpty) {
+      options :+= "args=\"" + args + "\""
+    }
+
+    val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
+    val result = decodeAs[AppSubmissionResult](resp)
+    assert(result.success)
+    true
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def queryApp(appId: Int): AppMasterData = {
+    val resp = callApi(s"appmaster/$appId")
+    decodeAs[AppMasterData](resp)
+  }
+
+  def queryAppMasterConfig(appId: Int): Config = {
+    val resp = callApi(s"appmaster/$appId/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
+    val resp = callApi(s"appmaster/$appId?detail=true")
+    decodeAs[StreamAppMasterSummary](resp)
+  }
+
+  def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
+    : HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
+    val resp = callApi(s"appmaster/$appId/executor/$executorId")
+    decodeAs[ExecutorSummary](resp)
+  }
+
+  def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
+    queryStreamingAppDetail(appId).executors.toArray
+  }
+
+  def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryExecutorConfig(appId: Int, executorId: Int): Config = {
+    val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryMaster(): MasterSummary = {
+    val resp = callApi("master")
+    decodeAs[MasterData](resp).masterDescription
+  }
+
+  def queryMasterMetrics(current: Boolean): HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val resp = callApi(s"master/metrics/master?$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryMasterConfig(): Config = {
+    val resp = callApi("master/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
+    val args = if (current) "?readLatest=true" else ""
+    val workerIdStr = WorkerId.render(workerId)
+    val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
+    decodeAs[HistoryMetrics](resp)
+  }
+
+  def queryWorkerConfig(workerId: WorkerId): Config = {
+    val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
+    ConfigFactory.parseString(resp)
+  }
+
+  def queryBuiltInPartitioners(): Array[String] = {
+    val resp = callApi("master/partitioners")
+    decodeAs[BuiltinPartitioners](resp).partitioners
+  }
+
+  def uploadJar(localFilePath: String): AppJar = {
+    val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
+    decodeAs[AppJar](resp)
+  }
+
+  def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
+    replaceStreamingAppProcessor(appId, replaceMe, false)
+  }
+
+  def replaceStreamingAppProcessor(
+      appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
+    val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
+    val args = upickle.default.write(replaceOperation)
+    val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
+      CRUD_POST)
+    decodeAs[DAGOperationResult](resp)
+    true
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def killAppMaster(appId: Int): Boolean = {
+    killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+  }
+
+  def killExecutor(appId: Int, executorId: Int): Boolean = try {
+    val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
+    val pid = jvmInfo(0).toInt
+    val hostname = jvmInfo(1)
+    Docker.killProcess(hostname, pid)
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def killApp(appId: Int): Boolean = try {
+    val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
+    resp.contains("\"status\":\"success\"")
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  def restartApp(appId: Int): Boolean = try {
+    val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
+    decodeAs[Status](resp).success
+  } catch {
+    case ex: Throwable =>
+      LOG.warn(s"swallowed an exception: $ex")
+      false
+  }
+
+  private val CRUD_POST = "-X POST"
+  private val CRUD_DELETE = "-X DELETE"
+
+  private def callApi(endpoint: String, option: String = ""): String = {
+    curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
+  }
+
+  private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
+    Docker.curl(host, s"http://$host:$port/$endpoint", options)
+  }
+
+  def login(): Unit = {
+    curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
+      "--data username=admin", "--data password=admin"))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
new file mode 100644
index 0000000..79adfc4
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import scala.util.Random
+
+import backtype.storm.utils.{DRPCClient, Utils}
+
+import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+class StormClient(cluster: MiniCluster, restClient: RestClient) {
+
+  private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
+  private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
+  private val DRPC_HOST = "storm0"
+  private val DRPC_PORT = 3772
+  private val DRPC_INVOCATIONS_PORT = 3773
+  private val STORM_DRPC = "storm-drpc"
+  private val NIMBUS_HOST = "storm1"
+  private val STORM_NIMBUS = "storm nimbus"
+  private val STORM_APP = "/opt/start storm app"
+
+  private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
+    tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+
+  private val nimbusContainer =
+    new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
+
+  def start(): Unit = {
+    nimbusContainer.createAndStart()
+    ensureNimbusRunning()
+
+    drpcContainer.createAndStart()
+    ensureDrpcServerRunning()
+  }
+
+  private def ensureNimbusRunning(): Unit = {
+    Util.retryUntil(() => {
+      val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
+      // Parse format nimbus.thrift.port: '39322'
+      val thriftPort = response.split(" ")(1).replace("'", "").toInt
+
+      Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
+    }, "Nimbus running")
+  }
+
+  private def ensureDrpcServerRunning(): Unit = {
+    Util.retryUntil(() => {
+      Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
+    }, "DRPC running")
+  }
+
+  def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
+    Util.retryUntil(() => {
+      Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+        s"-jar $jar $mainClass $args")
+      restClient.listRunningApps().exists(_.appName == appName)
+    }, "app running")
+    restClient.listRunningApps().filter(_.appName == appName).head.appId
+  }
+
+  def getDRPCClient(drpcServerIp: String): DRPCClient = {
+    val config = Utils.readDefaultConfig()
+    new DRPCClient(config, drpcServerIp, DRPC_PORT)
+  }
+
+  def shutDown(): Unit = {
+
+    // Cleans up the storm.yaml config file
+    Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
+    drpcContainer.killAndRemove()
+    nimbusContainer.killAndRemove()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
-  private var id = 0L
-
-  override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
-    val bytes = tuple.getBinary(0)
-    collector.emit(new Values(s"$id".getBytes, bytes))
-    id += 1
-  }
-
-  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
-    declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
-      FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
-  }
-}