You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/27 11:38:48 UTC
[1/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the
integration test file structure
Repository: incubator-gearpump
Updated Branches:
refs/heads/master c80c06963 -> e9ea6e2f3
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
deleted file mode 100644
index a0d9a42..0000000
--- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
-
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.{Config, StormSubmitter}
-import storm.kafka.bolt.KafkaBolt
-import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-
-/**
- * Tests Storm 0.10.x compatibility over Gearpump
- * this example reads data from Kafka and writes back to it
- */
-object Storm010KafkaTopology extends App with ArgumentsParser {
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
- "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
- "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
- "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
- required = true),
- "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
- "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
- defaultValue = Some(1)),
- "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
- )
-
- val configs = parse(args)
- val topologyName = configs.getString("topologyName")
- val sourceTopic = configs.getString("sourceTopic")
- val sinkTopic = configs.getString("sinkTopic")
- val zookeeperConnect = configs.getString("zookeeperConnect")
- val brokerList = configs.getString("brokerList")
- val spoutNum = configs.getInt("spoutNum")
- val boltNum = configs.getInt("boltNum")
-
- val topologyBuilder = new TopologyBuilder()
- val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic,
- zookeeperConnect, topologyName))
- val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
- val adaptor = new Adaptor
- topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
- topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
- topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
- val config = new Config()
- config.putAll(getBoltConfig(sinkTopic, brokerList))
- config.put(Config.TOPOLOGY_NAME, topologyName)
- val topology = topologyBuilder.createTopology()
- StormSubmitter.submitTopology(topologyName, config, topology)
-
- def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
- val hosts = new ZkHosts(zookeeperConnect)
- val index = zookeeperConnect.indexOf("/")
- val zookeeper = zookeeperConnect.take(index)
- val kafkaRoot = zookeeperConnect.drop(index)
- val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
-
- val serverAndPort = zookeeper.split(":")
- config.zkServers = new JArrayList[String]
- config.zkServers.add(serverAndPort(0))
- config.zkPort = serverAndPort(1).toInt
- config
- }
-
- def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
- val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
- val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
- brokerConfig.put("metadata.broker.list", brokerList)
- brokerConfig.put("request.required.acks", "1")
- kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
- kafkaConfig.put(KafkaBolt.TOPIC, topic)
- kafkaConfig
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
new file mode 100644
index 0000000..67a2491
--- /dev/null
+++ b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
+
+class Adaptor extends BaseBasicBolt {
+ private var id = 0L
+
+ override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
+ val bytes = tuple.getBinary(0)
+ collector.emit(new Values(s"$id".getBytes, bytes))
+ id += 1
+ }
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+ declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
+ FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
new file mode 100644
index 0000000..a0d9a42
--- /dev/null
+++ b/integrationtest/storm010/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm010KafkaTopology.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.{Config, StormSubmitter}
+import storm.kafka.bolt.KafkaBolt
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+
+/**
+ * Tests Storm 0.10.x compatibility over Gearpump
+ * this example reads data from Kafka and writes back to it
+ */
+object Storm010KafkaTopology extends App with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
+ "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
+ "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
+ "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
+ required = true),
+ "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
+ "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
+ defaultValue = Some(1)),
+ "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
+ )
+
+ val configs = parse(args)
+ val topologyName = configs.getString("topologyName")
+ val sourceTopic = configs.getString("sourceTopic")
+ val sinkTopic = configs.getString("sinkTopic")
+ val zookeeperConnect = configs.getString("zookeeperConnect")
+ val brokerList = configs.getString("brokerList")
+ val spoutNum = configs.getInt("spoutNum")
+ val boltNum = configs.getInt("boltNum")
+
+ val topologyBuilder = new TopologyBuilder()
+ val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic,
+ zookeeperConnect, topologyName))
+ val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
+ val adaptor = new Adaptor
+ topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
+ topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
+ topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
+ val config = new Config()
+ config.putAll(getBoltConfig(sinkTopic, brokerList))
+ config.put(Config.TOPOLOGY_NAME, topologyName)
+ val topology = topologyBuilder.createTopology()
+ StormSubmitter.submitTopology(topologyName, config, topology)
+
+ def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
+ val hosts = new ZkHosts(zookeeperConnect)
+ val index = zookeeperConnect.indexOf("/")
+ val zookeeper = zookeeperConnect.take(index)
+ val kafkaRoot = zookeeperConnect.drop(index)
+ val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
+
+ val serverAndPort = zookeeper.split(":")
+ config.zkServers = new JArrayList[String]
+ config.zkServers.add(serverAndPort(0))
+ config.zkPort = serverAndPort(1).toInt
+ config
+ }
+
+ def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
+ val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+ val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+ brokerConfig.put("metadata.broker.list", brokerList)
+ brokerConfig.put("request.required.acks", "1")
+ kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
+ kafkaConfig.put(KafkaBolt.TOPIC, topic)
+ kafkaConfig
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
- private var id = 0L
-
- override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
- val bytes = tuple.getBinary(0)
- collector.emit(new Values(s"$id".getBytes, bytes))
- id += 1
- }
-
- override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
- declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
- FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
deleted file mode 100644
index 5b74d60..0000000
--- a/integrationtest/storm09/src/main/scala/io/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
-
-import backtype.storm.topology.TopologyBuilder
-import backtype.storm.{Config, StormSubmitter}
-import storm.kafka.bolt.KafkaBolt
-import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-
-/**
- * Tests Storm 0.9.x compatibility over Gearpump
- * this example reads data from Kafka and writes back to it
- */
-object Storm09KafkaTopology extends App with ArgumentsParser {
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
- "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
- "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
- "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
- required = true),
- "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
- "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
- defaultValue = Some(1)),
- "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
- )
-
- val configs = parse(args)
- val topologyName = configs.getString("topologyName")
- val sourceTopic = configs.getString("sourceTopic")
- val sinkTopic = configs.getString("sinkTopic")
- val zookeeperConnect = configs.getString("zookeeperConnect")
- val brokerList = configs.getString("brokerList")
- val spoutNum = configs.getInt("spoutNum")
- val boltNum = configs.getInt("boltNum")
-
- val topologyBuilder = new TopologyBuilder()
- val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, zookeeperConnect,
- topologyName))
- val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
- val adaptor = new Adaptor
- topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
- topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
- topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
- val config = new Config()
- config.putAll(getBoltConfig(sinkTopic, brokerList))
- config.put(Config.TOPOLOGY_NAME, topologyName)
- val topology = topologyBuilder.createTopology()
- StormSubmitter.submitTopology(topologyName, config, topology)
-
- def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
- val hosts = new ZkHosts(zookeeperConnect)
- val index = zookeeperConnect.indexOf("/")
- val zookeeper = zookeeperConnect.take(index)
- val kafkaRoot = zookeeperConnect.drop(index)
- val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
- config.forceFromStart = true
-
- val serverAndPort = zookeeper.split(":")
- config.zkServers = new JArrayList[String]
- config.zkServers.add(serverAndPort(0))
- config.zkPort = serverAndPort(1).toInt
- config
- }
-
- def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
- val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
- val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
- brokerConfig.put("metadata.broker.list", brokerList)
- brokerConfig.put("request.required.acks", "1")
- kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
- kafkaConfig.put(KafkaBolt.TOPIC, topic)
- kafkaConfig
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
new file mode 100644
index 0000000..67a2491
--- /dev/null
+++ b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Adaptor.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import backtype.storm.topology.base.BaseBasicBolt
+import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
+import backtype.storm.tuple.{Fields, Tuple, Values}
+import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
+
+class Adaptor extends BaseBasicBolt {
+ private var id = 0L
+
+ override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
+ val bytes = tuple.getBinary(0)
+ collector.emit(new Values(s"$id".getBytes, bytes))
+ id += 1
+ }
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+ declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
+ FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
new file mode 100644
index 0000000..5b74d60
--- /dev/null
+++ b/integrationtest/storm09/src/main/scala/org/apache/gearpump/integrationtest/storm/Storm09KafkaTopology.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.storm
+
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import backtype.storm.topology.TopologyBuilder
+import backtype.storm.{Config, StormSubmitter}
+import storm.kafka.bolt.KafkaBolt
+import storm.kafka.{KafkaSpout, SpoutConfig, ZkHosts}
+
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+
+/**
+ * Tests Storm 0.9.x compatibility over Gearpump
+ * this example reads data from Kafka and writes back to it
+ */
+object Storm09KafkaTopology extends App with ArgumentsParser {
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "topologyName" -> CLIOption[Int]("<Storm topology name>", required = true),
+ "sourceTopic" -> CLIOption[String]("<Kafka topic to read data>", required = true),
+ "sinkTopic" -> CLIOption[String]("<Kafka topic to write data>", required = true),
+ "zookeeperConnect" -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
+ required = true),
+ "brokerList" -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
+ "spoutNum" -> CLIOption[Int]("<how many spout tasks>", required = false,
+ defaultValue = Some(1)),
+ "boltNum" -> CLIOption[Int]("<how many bolt tasks>", required = false, defaultValue = Some(1))
+ )
+
+ val configs = parse(args)
+ val topologyName = configs.getString("topologyName")
+ val sourceTopic = configs.getString("sourceTopic")
+ val sinkTopic = configs.getString("sinkTopic")
+ val zookeeperConnect = configs.getString("zookeeperConnect")
+ val brokerList = configs.getString("brokerList")
+ val spoutNum = configs.getInt("spoutNum")
+ val boltNum = configs.getInt("boltNum")
+
+ val topologyBuilder = new TopologyBuilder()
+ val kafkaSpout: KafkaSpout = new KafkaSpout(getSpoutConfig(sourceTopic, zookeeperConnect,
+ topologyName))
+ val kafkaBolt: KafkaBolt[Array[Byte], Array[Byte]] = new KafkaBolt[Array[Byte], Array[Byte]]()
+ val adaptor = new Adaptor
+ topologyBuilder.setSpout("kafka_spout", kafkaSpout, spoutNum)
+ topologyBuilder.setBolt("adaptor", adaptor).localOrShuffleGrouping("kafka_spout")
+ topologyBuilder.setBolt("kafka_bolt", kafkaBolt, boltNum).localOrShuffleGrouping("adaptor")
+ val config = new Config()
+ config.putAll(getBoltConfig(sinkTopic, brokerList))
+ config.put(Config.TOPOLOGY_NAME, topologyName)
+ val topology = topologyBuilder.createTopology()
+ StormSubmitter.submitTopology(topologyName, config, topology)
+
+ def getSpoutConfig(topic: String, zookeeperConnect: String, id: String): SpoutConfig = {
+ val hosts = new ZkHosts(zookeeperConnect)
+ val index = zookeeperConnect.indexOf("/")
+ val zookeeper = zookeeperConnect.take(index)
+ val kafkaRoot = zookeeperConnect.drop(index)
+ val config = new SpoutConfig(hosts, topic, kafkaRoot, id)
+ config.forceFromStart = true
+
+ val serverAndPort = zookeeper.split(":")
+ config.zkServers = new JArrayList[String]
+ config.zkServers.add(serverAndPort(0))
+ config.zkPort = serverAndPort(1).toInt
+ config
+ }
+
+ def getBoltConfig(topic: String, brokerList: String): JMap[String, AnyRef] = {
+ val kafkaConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+ val brokerConfig: JMap[String, AnyRef] = new JHashMap[String, AnyRef]
+ brokerConfig.put("metadata.broker.list", brokerList)
+ brokerConfig.put("request.required.acks", "1")
+ kafkaConfig.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, brokerConfig)
+ kafkaConfig.put(KafkaBolt.TOPIC, topic)
+ kafkaConfig
+ }
+}
+
[3/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the
integration test file structure
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
new file mode 100644
index 0000000..d8bdc1e
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.scalatest.TestData
+
+import org.apache.gearpump.integrationtest.kafka._
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks the Kafka datasource connector
+ */
+class ConnectorKafkaSpec extends TestSpecBase {
+
+ private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
+ private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head
+ private var producer: NumericalDataProducer = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ kafkaCluster.start()
+ }
+
+ override def afterAll(): Unit = {
+ kafkaCluster.shutDown()
+ super.afterAll()
+ }
+
+ override def afterEach(test: TestData): Unit = {
+ super.afterEach(test)
+ if (producer != null) {
+ producer.stop()
+ producer = null
+ }
+ }
+
+ "KafkaSource and KafkaSink" should {
+ "read from and write to kafka" in {
+ // setup
+ val sourceTopic = "topic1"
+ val sinkTopic = "topic2"
+ val messageNum = 10000
+ kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
+
+ // exercise
+ val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
+ "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+ "-brokerList", kafkaCluster.getBrokerListConnectString,
+ "-sourceTopic", sourceTopic,
+ "-sinkTopic", sinkTopic).mkString(" ")
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
+ success shouldBe true
+
+ // verify
+ expectAppIsRunning(appId, "KafkaReadWrite")
+ Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum,
+ "kafka all message written")
+ }
+ }
+
+ "Gearpump with Kafka" should {
+ "support at-least-once message delivery" in {
+ // setup
+ val sourcePartitionNum = 2
+ val sourceTopic = "topic3"
+ val sinkTopic = "topic4"
+ // Generate number sequence (1, 2, 3, ...) to the topic
+ kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+ producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString)
+ producer.start()
+
+ // exercise
+ val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
+ "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+ "-brokerList", kafkaCluster.getBrokerListConnectString,
+ "-sourceTopic", sourceTopic,
+ "-sinkTopic", sinkTopic,
+ "-source", sourcePartitionNum).mkString(" ")
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
+ success shouldBe true
+
+ // verify #1
+ expectAppIsRunning(appId, "KafkaReadWrite")
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+
+ // verify #2
+ val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+ restClient.killExecutor(appId, executorToKill) shouldBe true
+ Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+ .map(_.executorId).max > executorToKill,
+ s"executor $executorToKill killed")
+
+ // verify #3
+ val detector = new MessageLossDetector(producer.lastWriteNum)
+ val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+ host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
+ Util.retryUntil(() => {
+ kafkaReader.read()
+ detector.allReceived
+ }, "kafka all message read")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
new file mode 100644
index 0000000..56b33c1
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+import org.apache.gearpump.metrics.Metrics.Meter
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ProcessorSummary
+
+class DynamicDagSpec extends TestSpecBase {
+
+ lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
+ val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
+ val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
+ val solName = "sol"
+
+ "dynamic dag" should {
+ "can retrieve a list of built-in partitioner classes" in {
+ val partitioners = restClient.queryBuiltInPartitioners()
+ partitioners.length should be > 0
+ partitioners.foreach(clazz =>
+ clazz should startWith("org.apache.gearpump.partitioner.")
+ )
+ }
+
+ "can compose a wordcount application from scratch" in {
+ // todo: blocked by #1450
+ }
+
+ "can replace down stream with wordcount's sum processor (new processor will have metrics)" in {
+ // setup
+ val appId = expectSolJarSubmittedWithAppId()
+
+ // exercise
+ val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+ replaceProcessor(appId, 1, sumTaskClass)
+ var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+ Util.retryUntil(() => {
+ laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+ laterProcessors.size == formerProcessors.size + 1
+ }, "new processor successfully added")
+ processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
+ }
+
+ "can replace up stream with wordcount's split processor (new processor will have metrics)" in {
+ // setup
+ val appId = expectSolJarSubmittedWithAppId()
+
+ // exercise
+ val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+ replaceProcessor(appId, 0, splitTaskClass)
+ var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+ Util.retryUntil(() => {
+ laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+ laterProcessors.size == formerProcessors.size + 1
+ }, "new processor added")
+ processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput")
+ }
+
+ "fall back to last dag version when replacing a processor failid" in {
+ // setup
+ val appId = expectSolJarSubmittedWithAppId()
+
+ // exercise
+ val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+ replaceProcessor(appId, 1, sumTaskClass)
+ var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+ Util.retryUntil(() => {
+ laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+ laterProcessors.size == formerProcessors.size + 1
+ }, "new processor added")
+ processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
+
+ val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake"
+ replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
+ Util.retryUntil(() => {
+ val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors
+ processorsAfterFailure.size == laterProcessors.size
+ }, "new processor added")
+ val currentClock = restClient.queryStreamingAppDetail(appId).clock
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock,
+ "app clock is advancing")
+ }
+
+ "fall back to last dag version when AppMaster HA triggered" in {
+ // setup
+ val appId = expectSolJarSubmittedWithAppId()
+
+ // exercise
+ val formerAppMaster = restClient.queryApp(appId).appMasterPath
+ val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+ replaceProcessor(appId, 1, sumTaskClass)
+ var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+ Util.retryUntil(() => {
+ laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+ laterProcessors.size == formerProcessors.size + 1
+ }, "new processor added")
+ processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
+
+ restClient.killAppMaster(appId) shouldBe true
+ Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
+ "new AppMaster created")
+ val processors = restClient.queryStreamingAppDetail(appId).processors
+ processors.size shouldEqual laterProcessors.size
+ }
+ }
+
+ private def expectSolJarSubmittedWithAppId(): Int = {
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, solName)
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+ appId
+ }
+
+ private def replaceProcessor(
+ appId: Int,
+ formerProcessorId: Int,
+ newTaskClass: String,
+ newProcessorDescription: String = "",
+ newParallelism: Int = 1): Unit = {
+ val uploadedJar = restClient.uploadJar(wordCountJar)
+ val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
+ newParallelism, newProcessorDescription,
+ jar = uploadedJar)
+
+ // exercise
+ val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
+ success shouldBe true
+ }
+
+ private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = {
+ Util.retryUntil(() => {
+ val actual = restClient.queryStreamingAppMetrics(appId, current = false,
+ path = "processor" + processorId)
+ val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
+ throughput.size should be > 0
+ throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
+ }, "new processor has message received")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
new file mode 100644
index 0000000..27e4665
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
+import org.apache.gearpump.streaming._
+import org.apache.gearpump.streaming.appmaster.ProcessorSummary
+
+/**
+ * The test spec will perform destructive operations to check the stability
+ */
+class ExampleSpec extends TestSpecBase {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ "distributed shell" should {
+ "execute commands on machines where its executors are running" in {
+ val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head
+ val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell"
+ val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient"
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
+ success shouldBe true
+ expectAppIsRunning(appId, "DistributedShell")
+ val args = Array(
+ clientClass,
+ "-appid", appId.toString,
+ "-command", "hostname"
+ )
+
+ val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
+
+ def verify(): Boolean = {
+ val workerNum = cluster.getWorkerHosts.length
+ val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
+ workerNum, args.mkString(" ")).split("\n").
+ filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
+ expectedHostNames.forall(result.contains)
+ }
+
+ Util.retryUntil(() => verify(),
+ s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}")
+ }
+ }
+
+ "wordcount" should {
+ val wordCountJarNamePrefix = "wordcount-"
+ behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
+
+ "can submit immediately after killing a former one" in {
+ // setup
+ val formerAppId = restClient.getNextAvailableAppId()
+ val formerSubmissionSuccess =
+ restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ formerSubmissionSuccess shouldBe true
+ expectAppIsRunning(formerAppId, wordCountName)
+ Util.retryUntil(() =>
+ restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running")
+ restClient.killApp(formerAppId)
+
+ // exercise
+ val appId = formerAppId + 1
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+ }
+ }
+
+ "wordcount(java)" should {
+ val wordCountJavaJarNamePrefix = "wordcountjava-"
+ val wordCountJavaName = "wordcountJava"
+ behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName)
+ }
+
+ "sol" should {
+ val solJarNamePrefix = "sol-"
+ val solName = "sol"
+ behave like streamingApplication(solJarNamePrefix, solName)
+ }
+
+ "complexdag" should {
+ val dynamicDagJarNamePrefix = "complexdag-"
+ val dynamicDagName = "dag"
+ behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
+ }
+
+ def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
+ lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
+
+ "can obtain application clock and the clock will keep changing" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, appName)
+
+ // exercise
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted")
+ val formerClock = restClient.queryStreamingAppDetail(appId).clock
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock,
+ "app clock is advancing")
+ }
+
+ "can change the parallelism and description of a processor" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
+ formerSubmissionSuccess shouldBe true
+ expectAppIsRunning(appId, appName)
+ val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
+ val processor0 = formerProcessors.get(0).get
+ val expectedProcessorId = formerProcessors.size
+ val expectedParallelism = processor0.parallelism + 1
+ val expectedDescription = processor0.description + "new"
+ val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass,
+ expectedParallelism, description = expectedDescription)
+
+ // exercise
+ val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
+ success shouldBe true
+ var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
+ Util.retryUntil(() => {
+ laterProcessors = restClient.queryStreamingAppDetail(appId).processors
+ laterProcessors.size == formerProcessors.size + 1
+ }, "new process added")
+ val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
+ laterProcessor0.parallelism shouldEqual expectedParallelism
+ laterProcessor0.description shouldEqual expectedDescription
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
new file mode 100644
index 0000000..bb9982a
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._
+import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
+import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * Checks message delivery consistency, like at-least-once, and exactly-once.
+ */
+class MessageDeliverySpec extends TestSpecBase {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ }
+
+ "Gearpump" should {
+ "support exactly-once message delivery" in {
+ withKafkaCluster(cluster) { kafkaCluster =>
+ // setup
+ val sourcePartitionNum = 1
+ val sourceTopic = "topic1"
+ val sinkTopic = "topic2"
+
+ // Generate number sequence (1, 2, 3, ...) to the topic
+ kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+ withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer =>
+
+ withHadoopCluster { hadoopCluster =>
+ // exercise
+ val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp",
+ "-defaultFS", hadoopCluster.getDefaultFS,
+ "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+ "-brokerList", kafkaCluster.getBrokerListConnectString,
+ "-sourceTopic", sourceTopic,
+ "-sinkTopic", sinkTopic,
+ "-sourceTask", sourcePartitionNum).mkString(" ")
+ val appId = restClient.getNextAvailableAppId()
+
+ val stateJar = cluster.queryBuiltInExampleJars("state-").head
+ val success = restClient.submitApp(stateJar, executorNum = 1, args = args)
+ success shouldBe true
+
+ // verify #1
+ expectAppIsRunning(appId, "MessageCount")
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0,
+ "app is running")
+
+ // wait for checkpoint to take place
+ Thread.sleep(1000)
+
+ LOG.info("Trigger message replay by kill and restart the executors")
+ val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+ restClient.killExecutor(appId, executorToKill) shouldBe true
+ Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+ .map(_.executorId).max > executorToKill, s"executor $executorToKill killed")
+
+ producer.stop()
+ val producedNumbers = producer.producedNumbers
+ LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
+ s", ${producedNumbers.end - 1}] have been written to Kafka")
+
+ // verify #3
+ val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
+
+ assert(producedNumbers.size == kafkaSourceOffset,
+ "produced message should match Kafka queue size")
+
+ LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset)
+
+ // The sink processor of this job (MessageCountApp) writes total message
+ // count to Kafka Sink periodically (once every checkpoint interval).
+ // The detector keep record of latest message count.
+ val detector = new ResultVerifier {
+ var latestMessageCount: Int = 0
+ override def onNext(messageCount: Int): Unit = {
+ this.latestMessageCount = messageCount
+ }
+ }
+
+ val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+ host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
+ Util.retryUntil(() => {
+ kafkaReader.read()
+ LOG.info(s"Received message count: ${detector.latestMessageCount}, " +
+ s"expect: ${producedNumbers.size}")
+ detector.latestMessageCount == producedNumbers.size
+ }, "MessageCountApp calculated message count matches " +
+ "expected in case of message replay")
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
new file mode 100644
index 0000000..2f5bb64
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import scala.concurrent.duration._
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.master.MasterStatus
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks REST service usage
+ */
+class RestServiceSpec extends TestSpecBase {
+
+ "query system version" should {
+ "retrieve the current version number" in {
+ restClient.queryVersion() should not be empty
+ }
+ }
+
+ "list applications" should {
+ "retrieve 0 application after cluster just started" in {
+ restClient.listRunningApps().length shouldEqual 0
+ }
+
+ "retrieve 1 application after the first application submission" in {
+ // exercise
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+ restClient.listRunningApps().length shouldEqual 1
+ }
+ }
+
+ "submit application (wordcount)" should {
+ "find a running application after submission" in {
+ // exercise
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+ }
+
+ "reject a repeated submission request while the application is running" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
+ cluster.getWorkerHosts.length)
+ formerSubmissionSuccess shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+
+ // exercise
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe false
+ }
+
+ "reject an invalid submission (the jar file path is incorrect)" in {
+ // exercise
+ val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length)
+ success shouldBe false
+ }
+
+ "submit a wordcount application with 4 split and 3 sum processors and expect " +
+ "parallelism of processors match the given number" in {
+ // setup
+ val splitNum = 4
+ val sumNum = 3
+ val appId = restClient.getNextAvailableAppId()
+
+ // exercise
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
+ s"-split $splitNum -sum $sumNum")
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+ val processors = restClient.queryStreamingAppDetail(appId).processors
+ processors.size shouldEqual 2
+ val splitProcessor = processors.get(0).get
+ splitProcessor.parallelism shouldEqual splitNum
+ val sumProcessor = processors.get(1).get
+ sumProcessor.parallelism shouldEqual sumNum
+ }
+
+ "can obtain application metrics and the metrics will keep changing" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+
+ // exercise
+ expectMetricsAvailable(
+ restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty,
+ "metrics available")
+ val actual = restClient.queryStreamingAppMetrics(appId, current = true)
+ actual.path shouldEqual s"app$appId.processor*"
+ actual.metrics.foreach(metric => {
+ metric.time should be > 0L
+ metric.value should not be null
+ })
+ val formerMetricsDump = actual.metrics.toString()
+
+ expectMetricsAvailable({
+ val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics
+ laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+ }, "metrics available")
+ }
+
+ "can obtain application corresponding executors' metrics and " +
+ "the metrics will keep changing" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+
+ // exercise
+ expectMetricsAvailable(
+ restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty,
+ "metrics available")
+ val actual = restClient.queryExecutorMetrics(appId, current = true)
+ actual.path shouldEqual s"app$appId.executor*"
+ actual.metrics.foreach(metric => {
+ metric.time should be > 0L
+ metric.value should not be null
+ })
+ val formerMetricsDump = actual.metrics.toString()
+
+ expectMetricsAvailable({
+ val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics
+ laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+ }, "metrics available")
+ }
+ }
+
+ "kill application" should {
+ "a running application should be killed" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+
+ // exercise
+ killAppAndVerify(appId)
+ }
+
+ "should fail when attempting to kill a stopped application" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+ val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ submissionSucess shouldBe true
+ expectAppIsRunning(appId, wordCountName)
+ killAppAndVerify(appId)
+
+ // exercise
+ val success = restClient.killApp(appId)
+ success shouldBe false
+ }
+
+ "should fail when attempting to kill a non-exist application" in {
+ // setup
+ val freeAppId = restClient.listApps().length + 1
+
+ // exercise
+ val success = restClient.killApp(freeAppId)
+ success shouldBe false
+ }
+ }
+
+ "cluster information" should {
+ "retrieve 1 master for a non-HA cluster" in {
+ // exercise
+ val masterSummary = restClient.queryMaster()
+ masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses
+ masterSummary.aliveFor should be > 0L
+ masterSummary.masterStatus shouldEqual MasterStatus.Synced
+ }
+
+ "retrieve the same number of workers as cluster has" in {
+ // setup
+ val expectedWorkersCount = cluster.getWorkerHosts.size
+
+ // exercise
+ var runningWorkers: Array[WorkerSummary] = Array.empty
+ Util.retryUntil(() => {
+ runningWorkers = restClient.listRunningWorkers()
+ runningWorkers.length == expectedWorkersCount
+ }, "all workers running")
+ runningWorkers.foreach { worker =>
+ worker.state shouldEqual MasterToAppMaster.AppMasterActive
+ }
+ }
+
+ "find a newly added worker instance" in {
+ // setup
+ restartClusterRequired = true
+ val formerWorkersCount = cluster.getWorkerHosts.length
+ Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount,
+ "all workers running")
+ val workerName = "newWorker"
+
+ // exercise
+ cluster.addWorkerNode(workerName)
+ Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount,
+ "new worker added")
+ cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
+ restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
+ }
+
+ "retrieve 0 worker, if cluster is started without any workers" in {
+ // setup
+ restartClusterRequired = true
+ cluster.shutDown()
+
+ // exercise
+ cluster.start(workerNum = 0)
+ cluster.getWorkerHosts.length shouldEqual 0
+ restClient.listRunningWorkers().length shouldEqual 0
+ }
+
+ "can obtain master's metrics and the metrics will keep changing" in {
+ // exercise
+ expectMetricsAvailable(
+ restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available")
+ val actual = restClient.queryMasterMetrics(current = true)
+ actual.path shouldEqual s"master"
+ actual.metrics.foreach(metric => {
+ metric.time should be > 0L
+ metric.value should not be null
+ })
+ val formerMetricsDump = actual.metrics.toString()
+
+ expectMetricsAvailable({
+ val laterMetrics = restClient.queryMasterMetrics(current = true).metrics
+ laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+ }, "metrics available")
+ }
+
+ "can obtain workers' metrics and the metrics will keep changing" in {
+ // exercise
+ restClient.listRunningWorkers().foreach { worker =>
+ val workerId = worker.workerId
+ expectMetricsAvailable(
+ restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty,
+ "metrics available")
+ val actual = restClient.queryWorkerMetrics(workerId, current = true)
+ actual.path shouldEqual s"worker${WorkerId.render(workerId)}"
+ actual.metrics.foreach(metric => {
+ metric.time should be > 0L
+ metric.value should not be null
+ })
+ val formerMetricsDump = actual.metrics.toString()
+
+ expectMetricsAvailable({
+ val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics
+ laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
+ }, "metrics available")
+ }
+ }
+ }
+
+ "configuration" should {
+ "retrieve the configuration of master and match particular values" in {
+ // exercise
+ val actual = restClient.queryMasterConfig()
+ actual.hasPath("gearpump") shouldBe true
+ actual.hasPath("gearpump.cluster") shouldBe true
+ actual.getString("gearpump.hostname") shouldEqual cluster.getMasterHosts.mkString(",")
+ }
+
+ "retrieve the configuration of worker X and match particular values" in {
+ // exercise
+ restClient.listRunningWorkers().foreach { worker =>
+ val actual = restClient.queryWorkerConfig(worker.workerId)
+ actual.hasPath("gearpump") shouldBe true
+ actual.hasPath("gearpump.worker") shouldBe true
+ }
+ }
+
+ "retrieve the configuration of executor X and match particular values" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+
+ // exercise
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ restClient.queryExecutorBrief(appId).foreach { executor =>
+ val executorId = executor.executorId
+ val actual = restClient.queryExecutorConfig(appId, executorId)
+ actual.hasPath("gearpump") shouldBe true
+ actual.hasPath("gearpump.executor") shouldBe true
+ actual.getInt("gearpump.applicationId") shouldEqual appId
+ actual.getInt("gearpump.executorId") shouldEqual executorId
+ }
+ }
+
+ "retrieve the configuration of application X and match particular values" in {
+ // setup
+ val appId = restClient.getNextAvailableAppId()
+
+ // exercise
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
+ success shouldBe true
+ val actual = restClient.queryAppMasterConfig(appId)
+ actual.hasPath("gearpump") shouldBe true
+ actual.hasPath("gearpump.appmaster") shouldBe true
+ }
+ }
+
+ "application life-cycle" should {
+ "newly started application should be configured same as the previous one, after restart" in {
+ // setup
+ val originSplitNum = 4
+ val originSumNum = 3
+ val originAppId = restClient.getNextAvailableAppId()
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
+ s"-split $originSplitNum -sum $originSumNum")
+ success shouldBe true
+ expectAppIsRunning(originAppId, wordCountName)
+ val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
+
+ // exercise
+ Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted")
+ val killedApp = restClient.queryApp(originAppId)
+ killedApp.appId shouldEqual originAppId
+ killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+ val runningApps = restClient.listRunningApps()
+ runningApps.length shouldEqual 1
+ val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId)
+ newAppDetail.appName shouldEqual originAppDetail.appName
+ newAppDetail.processors.size shouldEqual originAppDetail.processors.size
+ newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum
+ newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum
+ }
+ }
+
+ private def killAppAndVerify(appId: Int): Unit = {
+ val success = restClient.killApp(appId)
+ success shouldBe true
+
+ val actualApp = restClient.queryApp(appId)
+ actualApp.appId shouldEqual appId
+ actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+ }
+
+ private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = {
+ val config = restClient.queryMasterConfig()
+ val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
+ Util.retryUntil(() => condition, conditionDescription, interval = reportInterval)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
new file mode 100644
index 0000000..4b15055
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import scala.concurrent.duration.Duration
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * The test spec will perform destructive operations to check the stability. Operations
+ * contains shutting-down appmaster, executor, or worker, and etc..
+ */
+class StabilitySpec extends TestSpecBase {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ "kill appmaster" should {
+ "restart the whole application" in {
+ // setup
+ val appId = commandLineClient.submitApp(wordCountJar)
+ val formerAppMaster = restClient.queryApp(appId).appMasterPath
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+ ensureClockStoredInMaster()
+
+ // exercise
+ restClient.killAppMaster(appId) shouldBe true
+ // todo: how long master will begin to recover and how much time for the recovering?
+ Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
+ "appmaster killed and restarted")
+
+ // verify
+ val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+ laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+ laterAppMaster.clock should be > 0L
+ }
+ }
+
+ "kill executor" should {
+ "will create a new executor and application will replay from the latest application clock" in {
+ // setup
+ val appId = commandLineClient.submitApp(wordCountJar)
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+ val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+ ensureClockStoredInMaster()
+
+ // exercise
+ restClient.killExecutor(appId, executorToKill) shouldBe true
+ // todo: how long appmaster will begin to recover and how much time for the recovering?
+ Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+ .map(_.executorId).max > executorToKill,
+ s"executor $executorToKill killed and restarted")
+
+ // verify
+ val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+ laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+ laterAppMaster.clock should be > 0L
+ }
+ }
+
+ private def hostName(workerId: WorkerId): String = {
+ val worker = restClient.listRunningWorkers().find(_.workerId == workerId)
+ // Parse hostname from JVM info (in format: PID@hostname)
+ val hostname = worker.get.jvmName.split("@")(1)
+ hostname
+ }
+
+ "kill worker" should {
+ "worker will not recover but all its executors will be migrated to other workers" in {
+ // setup
+ restartClusterRequired = true
+ val appId = commandLineClient.submitApp(wordCountJar)
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+
+ val allexecutors = restClient.queryExecutorBrief(appId)
+ val maxExecutor = allexecutors.sortBy(_.executorId).last
+ ensureClockStoredInMaster()
+
+ val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+ LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
+ s"worker: ${maxExecutor.workerId}")
+ val executorsSharingSameWorker = allexecutors
+ .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
+ LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," +
+ s" ${executorsSharingSameWorker.mkString(",")}")
+
+ // kill the worker and expect restarting all killed executors on other workers.
+ val workerIdToKill = maxExecutor.workerId
+ cluster.removeWorkerNode(hostName(workerIdToKill))
+
+ val appMasterKilled = executorsSharingSameWorker
+ .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+ def executorsMigrated(): Boolean = {
+ val executors = restClient.queryExecutorBrief(appId)
+ val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+
+ if (appMasterKilled) {
+ newAppMaster.get.workerId != appMaster.get.workerId
+ } else {
+ // New executors will be started to replace killed executors.
+ // The new executors will be assigned larger executor Id. We use this trick to detect
+ // Whether new executors have been started successfully.
+ executors.map(_.executorId).max > maxExecutor.executorId
+ }
+ }
+
+ Util.retryUntil(() => {
+ executorsMigrated()
+ }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed")
+
+ // verify
+ val laterAppMaster = restClient.queryStreamingAppDetail(appId)
+ laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+ laterAppMaster.clock should be > 0L
+ }
+ }
+
+ "kill master" should {
+ "master will be down and all workers will attempt to reconnect and suicide after X seconds" in {
+ // setup
+ restartClusterRequired = true
+ val masters = cluster.getMasterHosts
+ val config = restClient.queryMasterConfig()
+ val shutDownTimeout = Duration(config.getString("akka.cluster.auto-down-unreachable-after"))
+
+ // exercise
+ masters.foreach(cluster.removeMasterNode)
+ info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers are down")
+ Thread.sleep(shutDownTimeout.toMillis)
+
+ // verify
+ val aliveWorkers = cluster.getWorkerHosts
+ Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)),
+ "all workers down")
+ }
+ }
+
+ private def ensureClockStoredInMaster(): Unit = {
+ // TODO: 5000ms is a fixed sync period in clock service.
+ // we wait for 5000ms to assume the clock is stored
+ Thread.sleep(5000)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
new file mode 100644
index 0000000..b327bf4
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader}
+import org.apache.gearpump.integrationtest.storm.StormClient
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
+
+/**
+ * The test spec checks the compatibility of running Storm applications
+ */
+class StormCompatibilitySpec extends TestSpecBase {
+
+ private lazy val stormClient = {
+ new StormClient(cluster, restClient)
+ }
+
+ val `version0.9` = "09"
+ val `version0.10` = "010"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ stormClient.start()
+ }
+
+ override def afterAll(): Unit = {
+ stormClient.shutDown()
+ super.afterAll()
+ }
+
+ def withStorm(testCode: String => Unit): Unit = {
+ testCode(`version0.9`)
+ testCode(`version0.10`)
+ }
+
+ def getTopologyName(name: String, stormVersion: String): String = {
+ s"${name}_$stormVersion"
+ }
+
+ def getStormJar(stormVersion: String): String = {
+ cluster.queryBuiltInITJars(s"storm$stormVersion-").head
+ }
+
+ "Storm over Gearpump" should withStorm {
+ stormVersion =>
+ s"support basic topologies ($stormVersion)" in {
+ val stormJar = getStormJar(stormVersion)
+ val topologyName = getTopologyName("exclamation", stormVersion)
+
+ // exercise
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.ExclamationTopology",
+ args = topologyName,
+ appName = topologyName)
+
+ // verify
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+ }
+
+ s"support to run a python version of wordcount ($stormVersion)" in {
+ val stormJar = getStormJar(stormVersion)
+ val topologyName = getTopologyName("wordcount", stormVersion)
+
+ // exercise
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.WordCountTopology",
+ args = topologyName,
+ appName = topologyName)
+
+ // verify
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+ }
+
+ s"support DRPC ($stormVersion)" in {
+ // ReachTopology computes the Twitter url reached by users and their followers
+ // using Storm Distributed RPC feature
+ // input (user and follower) data are already prepared in memory
+ val stormJar = getStormJar(stormVersion)
+ val topologyName = getTopologyName("reach", stormVersion)
+ stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.ReachTopology",
+ args = topologyName,
+ appName = topologyName)
+ val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
+
+ // verify
+ Util.retryUntil(() => {
+ drpcClient.execute("reach", "notaurl.com") == "0"
+ }, "drpc reach == 0")
+ drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
+ drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14"
+ }
+
+ s"support tick tuple ($stormVersion)" in {
+ val stormJar = getStormJar(stormVersion)
+ val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
+
+ // exercise
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = "storm.starter.RollingTopWords",
+ args = s"$topologyName remote",
+ appName = topologyName)
+
+ // verify
+ Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+ }
+
+ s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
+
+ val stormJar = getStormJar(stormVersion)
+ val topologyName = getTopologyName("storm_kafka", stormVersion)
+ val stormKafkaTopology =
+ s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
+
+ import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
+ withKafkaCluster(cluster) {
+ kafkaCluster =>
+ val sourcePartitionNum = 2
+ val sinkPartitionNum = 1
+ val zookeeper = kafkaCluster.getZookeeperConnectString
+ val brokerList = kafkaCluster.getBrokerListConnectString
+ val sourceTopic = "topic1"
+ val sinkTopic = "topic2"
+
+ val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
+ "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
+ "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum"
+ )
+
+ kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+ // generate number sequence (1, 2, 3, ...) to the topic
+ withDataProducer(sourceTopic, brokerList) { producer =>
+
+ val appId = stormClient.submitStormApp(
+ jar = stormJar,
+ mainClass = stormKafkaTopology,
+ args = args.mkString(" "),
+ appName = topologyName)
+
+ Util.retryUntil(() =>
+ restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
+
+ // kill executor and verify at-least-once is guaranteed on application restart
+ val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
+ restClient.killExecutor(appId, executorToKill) shouldBe true
+ Util.retryUntil(() =>
+ restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill,
+ s"executor $executorToKill killed")
+
+ // verify no message loss
+ val detector = new
+ MessageLossDetector(producer.lastWriteNum)
+ val kafkaReader = new
+ SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost,
+ port = kafkaCluster.advertisedPort)
+
+ Util.retryUntil(() => {
+ kafkaReader.read()
+ detector.allReceived
+ }, "all kafka message read")
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
new file mode 100644
index 0000000..7942308
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.suites
+
+import org.scalatest._
+
+import org.apache.gearpump.integrationtest.MiniClusterProvider
+import org.apache.gearpump.integrationtest.checklist._
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+
+/**
+ * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific
+ * test spec, you need to comment out other lines.
+ */
+class StandaloneModeSuite extends Suites(
+ new CommandLineSpec,
+ new RestServiceSpec,
+ new ExampleSpec,
+ new DynamicDagSpec,
+ new StormCompatibilitySpec,
+ new StabilitySpec,
+ new ConnectorKafkaSpec,
+ new MessageDeliverySpec
+) with BeforeAndAfterAll {
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ MiniClusterProvider.managed = true
+ MiniClusterProvider.set(new MiniCluster).start()
+ }
+
+ override def afterAll(): Unit = {
+ MiniClusterProvider.get.shutDown()
+ super.afterAll()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
deleted file mode 100644
index f315ad3..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.apache.log4j.Logger
-
-/**
- * The class is used to execute Docker commands.
- */
-object Docker {
-
- private val LOG = Logger.getLogger(getClass)
-
- /**
- * @throws RuntimeException in case retval != 0
- */
- private def doExecute(container: String, command: String): String = {
- ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container")
- }
-
- private def doExecuteSilently(container: String, command: String): Boolean = {
- ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
- }
-
- /**
- * @throws RuntimeException in case retval != 0
- */
- final def execute(container: String, command: String): String = {
- trace(container, s"Execute $command") {
- doExecute(container, command)
- }
- }
-
- final def executeSilently(container: String, command: String): Boolean = {
- trace(container, s"Execute silently $command") {
- doExecuteSilently(container, command)
- }
- }
-
- final def listContainers(): Seq[String] = {
- trace("", s"Listing how many containers...") {
- ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
- .split("\n").filter(_.nonEmpty)
- }
- }
-
- final def containerIsRunning(name: String): Boolean = {
- trace(name, s"Check container running or not...") {
- ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty
- }
- }
-
- final def getContainerIPAddr(name: String): String = {
- trace(name, s"Get Ip Address") {
- Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
- }
- }
-
- final def containerExists(name: String): Boolean = {
- trace(name, s"Check container existing or not...") {
- ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty
- }
- }
-
- /**
- * @throws RuntimeException in case particular container is created already
- */
- final def createAndStartContainer(name: String, image: String, command: String,
- environ: Map[String, String] = Map.empty, // key, value
- volumes: Map[String, String] = Map.empty, // from, to
- knownHosts: Set[String] = Set.empty,
- tunnelPorts: Set[Int] = Set.empty): String = {
-
- if (containerExists(name)) {
- killAndRemoveContainer(name)
- }
-
- trace(name, s"Create and start $name ($image)...") {
-
- val optsBuilder = new StringBuilder
- optsBuilder.append("-d") // run in background
- optsBuilder.append(" -h " + name) // use container name as hostname
- optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings
-
- environ.foreach { case (key, value) =>
- optsBuilder.append(s" -e $key=$value")
- }
- volumes.foreach { case (from, to) =>
- optsBuilder.append(s" -v $from:$to")
- }
- knownHosts.foreach(host =>
- optsBuilder.append(" --link " + host)
- )
- tunnelPorts.foreach(port =>
- optsBuilder.append(s" -p $port:$port")
- )
- createAndStartContainer(name, optsBuilder.toString(), command, image)
- }
- }
-
- /**
- * @throws RuntimeException in case particular container is created already
- */
- private def createAndStartContainer(
- name: String, options: String, command: String, image: String): String = {
- ShellExec.execAndCaptureOutput(s"docker run $options " +
- s"--name $name $image $command", s"MAKE $name")
- }
-
- final def killAndRemoveContainer(name: String): Boolean = {
- trace(name, s"kill and remove container") {
- ShellExec.exec(s"docker rm -f $name", s"STOP $name")
- }
- }
-
- final def killAndRemoveContainer(names: Array[String]): Boolean = {
- assert(names.length > 0)
- val args = names.mkString(" ")
- trace(names.mkString(","), s"kill and remove containers") {
- ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
- }
- }
-
- private def inspect(container: String, option: String): String = {
- ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container")
- }
-
- final def curl(container: String, url: String, options: Array[String] = Array.empty[String])
- : String = {
- trace(container, s"curl $url") {
- doExecute(container, s"curl -s ${options.mkString(" ")} $url")
- }
- }
-
- final def getHostName(container: String): String = {
- trace(container, s"Get hostname of container...") {
- doExecute(container, "hostname")
- }
- }
-
- final def getNetworkGateway(container: String): String = {
- trace(container, s"Get gateway of container...") {
- doExecute(container, "ip route").split("\\s+")(2)
- }
- }
- final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = {
- trace(container, s"Kill process pid: $pid") {
- doExecuteSilently(container, s"kill -$signal $pid")
- }
- }
-
- final def findJars(container: String, folder: String): Array[String] = {
- trace(container, s"Find jars under $folder") {
- doExecute(container, s"find $folder")
- .split("\n").filter(_.endsWith(".jar"))
- }
- }
-
- private def trace[T](container: String, msg: String)(fun: => T): T = {
- // scalastyle:off println
- Console.println() // A empty line to let the output looks better.
- // scalastyle:on println
- LOG.debug(s"Container $container====>> $msg")
- LOG.debug("INPUT==>>")
- val response = fun
- LOG.debug("<<==OUTPUT")
-
- LOG.debug(brief(response))
-
- LOG.debug(s"<<====Command END. Container $container, $msg \n")
- response
- }
-
- private val PREVIEW_MAX_LENGTH = 1024
-
- private def brief[T](input: T): String = {
- val output = input match {
- case true =>
- "Success|True"
- case false =>
- "Failure|False"
- case x: Array[Any] =>
- "Success: [" + x.mkString(",") + "]"
- case x =>
- x.toString
- }
-
- val preview = if (output.length > PREVIEW_MAX_LENGTH) {
- output.substring(0, PREVIEW_MAX_LENGTH) + "..."
- }
- else {
- output
- }
- preview
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
deleted file mode 100644
index 25d7ee3..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.sys.process._
-
-import org.apache.log4j.Logger
-import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
-
-/**
- * The class is used to execute command in a shell
- */
-object ShellExec {
-
- private val LOG = Logger.getLogger(getClass)
- private val PROCESS_TIMEOUT = 2.minutes
-
- /**
- * The builtin command line parser by ProcessBuilder (implicit sys.process) don't
- * respect the quote chars (' and ")
- */
- private def splitQuotedString(str: String): List[String] = {
- val splitter = new QuotedStringTokenizer(str, " \t\n\r")
- splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
- }
-
- def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = {
- LOG.debug(s"$sender => `$command`")
-
- val p = splitQuotedString(command).run()
- val f = Future(blocking(p.exitValue())) // wrap in Future
- val retval = {
- try {
- Await.result(f, timeout)
- } catch {
- case _: TimeoutException =>
- LOG.error(s"timeout to execute command `$command`")
- p.destroy()
- p.exitValue()
- }
- }
- LOG.debug(s"$sender <= exit $retval")
- retval == 0
- }
-
- def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT)
- : String = {
- LOG.debug(s"$sender => `$command`")
-
- val buf = new StringBuilder
- val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"),
- (e: String) => buf.append(e).append("\n"))
- val p = splitQuotedString(command).run(processLogger)
- val f = Future(blocking(p.exitValue())) // wrap in Future
- val retval = {
- try {
- Await.result(f, timeout)
- } catch {
- case _: TimeoutException =>
- p.destroy()
- p.exitValue()
- }
- }
- val output = buf.toString().trim
- val PREVIEW_MAX_LENGTH = 200
- val preview = if (output.length > PREVIEW_MAX_LENGTH) {
- output.substring(0, PREVIEW_MAX_LENGTH) + "..."
- } else {
- output
- }
-
- LOG.debug(s"$sender <= `$preview` exit $retval")
- if (retval != 0) {
- throw new RuntimeException(
- s"exited ($retval) by executing `$command`")
- }
- output
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
deleted file mode 100644
index 7e7085d..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import scala.concurrent.duration._
-import scala.util.{Failure, Success, Try}
-
-import org.apache.log4j.Logger
-
-object Util {
-
- private val LOG = Logger.getLogger(getClass)
-
- def encodeUriComponent(s: String): String = {
- try {
- java.net.URLEncoder.encode(s, "UTF-8")
- .replaceAll("\\+", "%20")
- .replaceAll("\\%21", "!")
- .replaceAll("\\%27", "'")
- .replaceAll("\\%28", "(")
- .replaceAll("\\%29", ")")
- .replaceAll("\\%7E", "~")
- } catch {
- case ex: Throwable => s
- }
- }
-
- def retryUntil(
- condition: () => Boolean, conditionDescription: String, maxTries: Int = 15,
- interval: Duration = 10.seconds): Unit = {
- var met = false
- var tries = 0
-
- while (!met && tries < maxTries) {
-
- met = Try(condition()) match {
- case Success(true) => true
- case Success(false) => false
- case Failure(ex) => false
- }
-
- tries += 1
-
- if (!met) {
- LOG.error(s"Failed due to (false == $conditionDescription), " +
- s"retrying for the ${tries} times...")
- Thread.sleep(interval.toMillis)
- } else {
- LOG.info(s"Success ($conditionDescription) after ${tries} retries")
- }
- }
-
- if (!met) {
- throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
deleted file mode 100644
index f836abd..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.hadoop
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-object HadoopCluster {
-
- /** Starts a Hadoop cluster */
- def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
- val hadoopCluster = new HadoopCluster
- try {
- hadoopCluster.start()
- testCode(hadoopCluster)
- } finally {
- hadoopCluster.shutDown()
- }
- }
-}
-/**
- * This class maintains a single node Hadoop cluster
- */
-class HadoopCluster {
-
- private val LOG = Logger.getLogger(getClass)
- private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
- private val HADOOP_HOST = "hadoop0"
-
- def start(): Unit = {
- Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
-
- Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
- LOG.info("Hadoop cluster is started.")
- }
-
- // Checks whether the cluster is alive by listing "/"
- private def isAlive: Boolean = {
- Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /")
- }
-
- def getDefaultFS: String = {
- val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
- s"hdfs://$hostIPAddr:9000"
- }
-
- def shutDown(): Unit = {
- Docker.killAndRemoveContainer(HADOOP_HOST)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
deleted file mode 100644
index 15ba084..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-object KafkaCluster {
-
- /** Starts a Kafka cluster */
- def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
- val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
- try {
- kafkaCluster.start()
- testCode(kafkaCluster)
- } finally {
- kafkaCluster.shutDown()
- }
- }
-
- def withDataProducer(topic: String, brokerList: String)
- (testCode: NumericalDataProducer => Unit): Unit = {
- val producer = new NumericalDataProducer(topic, brokerList)
- try {
- producer.start()
- testCode(producer)
- } finally {
- producer.stop()
- }
- }
-}
-
-/**
- * This class maintains a single node Kafka cluster with integrated Zookeeper.
- */
-class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
-
- private val LOG = Logger.getLogger(getClass)
- private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
- private val KAFKA_HOST = "kafka0"
- private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
- private val ZOOKEEPER_PORT = 2181
- private val BROKER_PORT = 9092
- val advertisedPort = BROKER_PORT
-
- def start(): Unit = {
- Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
- environ = Map(
- "ADVERTISED_HOST" -> advertisedHost,
- "ADVERTISED_PORT" -> BROKER_PORT.toString,
- "ZK_CHROOT" -> zkChroot),
- tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
- )
- Util.retryUntil(() => isAlive, "kafka cluster is alive")
- LOG.debug("kafka cluster is started.")
- }
-
- def isAlive: Boolean = {
- !listTopics().contains("Connection refused")
- }
-
- def shutDown(): Unit = {
- Docker.killAndRemoveContainer(KAFKA_HOST)
- }
-
- private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
-
- def listTopics(): String = {
- kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
- }
-
- def getZookeeperConnectString: String = {
- s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
- }
-
- def getBrokerListConnectString: String = {
- s"$hostIPAddr:$BROKER_PORT"
- }
-
- def createTopic(topic: String, partitions: Int = 1): Unit = {
- LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
-
- Docker.executeSilently(KAFKA_HOST,
- s"$KAFKA_HOME/bin/kafka-topics.sh" +
- s" --zookeeper $getZookeeperConnectString" +
- s" --create --topic $topic --partitions $partitions --replication-factor 1")
- }
-
- def produceDataToKafka(topic: String, messageNum: Int): Unit = {
- Docker.executeSilently(KAFKA_HOST,
- s"$KAFKA_HOME/bin/kafka-topics.sh" +
- s" --zookeeper $getZookeeperConnectString" +
- s" --create --topic $topic --partitions 1 --replication-factor 1")
-
- Docker.executeSilently(KAFKA_HOST,
- s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
- s" --broker-list $getBrokerListConnectString" +
- s" --topic $topic --messages $messageNum")
- }
-
- def getLatestOffset(topic: String): Int = {
- kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
- }
-
- private def kafkaListTopics(
- container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
-
- LOG.debug(s"|=> Kafka list topics...")
- Docker.execute(container,
- s"$kafkaHome/bin/kafka-topics.sh" +
- s" --zookeeper $zookeeperConnectionString -list")
- }
-
- private def kafkaFetchLatestOffset(
- container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
- LOG.debug(s"|=> Get latest offset of topic $topic...")
- val output = Docker.execute(container,
- s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
- s" --broker-list $brokersList " +
- s" --topic $topic --time -1")
- output.split(":")(2).toInt
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
deleted file mode 100644
index 1cf3125..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import java.util.Properties
-
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.streaming.serializer.ChillSerializer
-
-class NumericalDataProducer(topic: String, bootstrapServers: String) {
-
- private val LOG = Logger.getLogger(getClass)
- private val producer = createProducer
- private val WRITE_SLEEP_NANOS = 10
- private val serializer = new ChillSerializer[Int]
- var lastWriteNum = 0
-
- def start(): Unit = {
- produceThread.start()
- }
-
- def stop(): Unit = {
- if (produceThread.isAlive) {
- produceThread.interrupt()
- produceThread.join()
- }
- producer.close()
- }
-
- /** How many message we have written in total */
- def producedNumbers: Range = {
- Range(1, lastWriteNum + 1)
- }
-
- private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
- val properties = new Properties()
- properties.setProperty("bootstrap.servers", bootstrapServers)
- new KafkaProducer[Array[Byte], Array[Byte]](properties,
- new ByteArraySerializer, new ByteArraySerializer)
- }
-
- private val produceThread = new Thread(new Runnable {
- override def run(): Unit = {
- try {
- while (!Thread.currentThread.isInterrupted) {
- lastWriteNum += 1
- val msg = serializer.serialize(lastWriteNum)
- val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
- producer.send(record)
- Thread.sleep(0, WRITE_SLEEP_NANOS)
- }
- } catch {
- case ex: InterruptedException =>
- LOG.error("message producing is stopped by an interrupt")
- }
- }
- })
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
deleted file mode 100644
index 1f773d3..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import scala.collection.mutable
-
-trait ResultVerifier {
- def onNext(num: Int): Unit
-}
-
-class MessageLossDetector(totalNum: Int) extends ResultVerifier {
- private val bitSets = new mutable.BitSet(totalNum)
- var result = List.empty[Int]
-
- override def onNext(num: Int): Unit = {
- bitSets.add(num)
- result :+= num
- }
-
- def allReceived: Boolean = {
- 1.to(totalNum).forall(bitSets)
- }
-
- def received(num: Int): Boolean = {
- bitSets(num)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
deleted file mode 100644
index 392ca86..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.kafka
-
-import scala.util.{Failure, Success}
-
-import kafka.api.FetchRequestBuilder
-import kafka.consumer.SimpleConsumer
-import kafka.utils.Utils
-
-import org.apache.gearpump.streaming.serializer.ChillSerializer
-
-class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0,
- host: String, port: Int) {
-
- private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
- private val serializer = new ChillSerializer[Int]
- private var offset = 0L
-
- def read(): Unit = {
- val messageSet = consumer.fetch(
- new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build()
- ).messageSet(topic, partition)
-
- for (messageAndOffset <- messageSet) {
- serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match {
- case Success(msg) =>
- offset = messageAndOffset.nextOffset
- verifier.onNext(msg)
- case Failure(e) => throw e
- }
- }
- }
-}
\ No newline at end of file
[4/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the
integration test file structure
Posted by ma...@apache.org.
fix GEARPUMP-150 correct the integration test file structure
Author: huafengw <fv...@gmail.com>
Closes #26 from huafengw/name.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e9ea6e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e9ea6e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e9ea6e2f
Branch: refs/heads/master
Commit: e9ea6e2f3366ede2aa350204e8b3854a6d0081ca
Parents: c80c069
Author: huafengw <fv...@gmail.com>
Authored: Fri May 27 19:38:40 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri May 27 19:38:40 2016 +0800
----------------------------------------------------------------------
.../integrationtest/MiniClusterProvider.scala | 41 ---
.../gearpump/integrationtest/TestSpecBase.scala | 92 -----
.../checklist/CommandLineSpec.scala | 133 -------
.../checklist/ConnectorKafkaSpec.scala | 120 ------
.../checklist/DynamicDagSpec.scala | 157 --------
.../integrationtest/checklist/ExampleSpec.scala | 148 --------
.../checklist/MessageDeliverySpec.scala | 122 ------
.../checklist/RestServiceSpec.scala | 369 -------------------
.../checklist/StabilitySpec.scala | 162 --------
.../checklist/StormCompatibilitySpec.scala | 185 ----------
.../suites/StandaloneModeSuite.scala | 51 ---
.../integrationtest/MiniClusterProvider.scala | 41 +++
.../gearpump/integrationtest/TestSpecBase.scala | 92 +++++
.../checklist/CommandLineSpec.scala | 133 +++++++
.../checklist/ConnectorKafkaSpec.scala | 120 ++++++
.../checklist/DynamicDagSpec.scala | 157 ++++++++
.../integrationtest/checklist/ExampleSpec.scala | 148 ++++++++
.../checklist/MessageDeliverySpec.scala | 122 ++++++
.../checklist/RestServiceSpec.scala | 369 +++++++++++++++++++
.../checklist/StabilitySpec.scala | 162 ++++++++
.../checklist/StormCompatibilitySpec.scala | 185 ++++++++++
.../suites/StandaloneModeSuite.scala | 51 +++
.../io/gearpump/integrationtest/Docker.scala | 211 -----------
.../io/gearpump/integrationtest/ShellExec.scala | 98 -----
.../io/gearpump/integrationtest/Util.scala | 72 ----
.../integrationtest/hadoop/HadoopCluster.scala | 67 ----
.../integrationtest/kafka/KafkaCluster.scala | 140 -------
.../kafka/NumericalDataProducer.scala | 76 ----
.../integrationtest/kafka/ResultVerifier.scala | 42 ---
.../kafka/SimpleKafkaReader.scala | 49 ---
.../minicluster/BaseContainer.scala | 60 ---
.../minicluster/CommandLineClient.scala | 84 -----
.../minicluster/MiniCluster.scala | 189 ----------
.../minicluster/RestClient.scala | 268 --------------
.../integrationtest/storm/StormClient.scala | 91 -----
.../gearpump/integrationtest/Docker.scala | 211 +++++++++++
.../gearpump/integrationtest/ShellExec.scala | 98 +++++
.../apache/gearpump/integrationtest/Util.scala | 72 ++++
.../integrationtest/hadoop/HadoopCluster.scala | 67 ++++
.../integrationtest/kafka/KafkaCluster.scala | 140 +++++++
.../kafka/NumericalDataProducer.scala | 76 ++++
.../integrationtest/kafka/ResultVerifier.scala | 42 +++
.../kafka/SimpleKafkaReader.scala | 49 +++
.../minicluster/BaseContainer.scala | 60 +++
.../minicluster/CommandLineClient.scala | 84 +++++
.../minicluster/MiniCluster.scala | 189 ++++++++++
.../minicluster/RestClient.scala | 268 ++++++++++++++
.../integrationtest/storm/StormClient.scala | 91 +++++
.../integrationtest/storm/Adaptor.scala | 38 --
.../storm/Storm010KafkaTopology.scala | 95 -----
.../integrationtest/storm/Adaptor.scala | 38 ++
.../storm/Storm010KafkaTopology.scala | 95 +++++
.../integrationtest/storm/Adaptor.scala | 38 --
.../storm/Storm09KafkaTopology.scala | 95 -----
.../integrationtest/storm/Adaptor.scala | 38 ++
.../storm/Storm09KafkaTopology.scala | 95 +++++
56 files changed, 3293 insertions(+), 3293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
deleted file mode 100644
index 4a161c7..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-
-/**
- * Provides a min cluster of Gearpump, which contains one or more masters, and workers.
- */
-object MiniClusterProvider {
-
- private var instance = new MiniCluster
-
- def get: MiniCluster = instance
-
- def set(instance: MiniCluster): MiniCluster = {
- this.instance = instance
- instance
- }
-
- /**
- * Indicates whether test suite should create particular cluster. In case of false, every
- * test spec will be responsible for cluster creation.
- */
- var managed = false
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
deleted file mode 100644
index dabcc71..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest
-
-import org.scalatest._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
-import org.apache.gearpump.util.LogUtil
-
-/**
- * The abstract test spec
- */
-trait TestSpecBase
- extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll {
-
- private def LOGGER = LogUtil.getLogger(getClass)
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- if (!MiniClusterProvider.managed) {
- LOGGER.info("Will test with a default standalone mini cluster")
- MiniClusterProvider.get.start()
- }
- }
-
- override def afterAll(): Unit = {
- if (!MiniClusterProvider.managed) {
- LOGGER.info("Will shutdown the default mini cluster")
- MiniClusterProvider.get.shutDown()
- }
- super.afterAll()
- }
-
- lazy val cluster = MiniClusterProvider.get
- lazy val commandLineClient = cluster.commandLineClient
- lazy val restClient = cluster.restClient
-
- lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head
- lazy val wordCountName = "wordCount"
-
- var restartClusterRequired: Boolean = false
-
- override def beforeEach(td: TestData): Unit = {
-
- LOGGER.debug(s">### =============================================================")
- LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
-
- assert(cluster != null, "Configure MiniCluster properly in suite spec")
- cluster.isAlive shouldBe true
- restClient.listRunningApps().isEmpty shouldBe true
- LOGGER.debug(s">### =============================================================")
- LOGGER.debug(s">###2 Start test: ${td.name}\n")
- }
-
- override def afterEach(td: TestData): Unit = {
- LOGGER.debug(s"<### =============================================================")
- LOGGER.debug(s"<###3 End test: ${td.name}\n")
-
- if (restartClusterRequired || !cluster.isAlive) {
- restartClusterRequired = false
- LOGGER.info("Will restart the cluster for next test case")
- cluster.restart()
- } else {
- restClient.listRunningApps().foreach(app => {
- commandLineClient.killApp(app.appId) shouldBe true
- })
- }
- }
-
- def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = {
- val app = restClient.queryApp(appId)
- app.status shouldEqual MasterToAppMaster.AppMasterActive
- app.appName shouldEqual expectedAppName
- app
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
deleted file mode 100644
index eabc684..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.TestSpecBase
-
-/**
- * The test spec checks the command-line usage
- */
-class CommandLineSpec extends TestSpecBase {
-
- "use `gear info` to list applications" should {
- "retrieve 0 application after cluster just started" in {
- // exercise
- getRunningAppCount shouldEqual 0
- }
-
- "retrieve 1 application after the first application submission" in {
- // setup
- val appId = expectSubmitAppSuccess(wordCountJar)
- expectAppIsRunningByParsingOutput(appId, wordCountName)
-
- // exercise
- getRunningAppCount shouldEqual 1
- }
- }
-
- "use `gear app` to submit application" should {
- "find a running application after submission" in {
- // exercise
- val appId = expectSubmitAppSuccess(wordCountJar)
- expectAppIsRunningByParsingOutput(appId, wordCountName)
- }
-
- "reject a repeated submission request while the application is running" in {
- // setup
- val appId = expectSubmitAppSuccess(wordCountJar)
- expectAppIsRunningByParsingOutput(appId, wordCountName)
-
- // exercise
- val actualAppId = commandLineClient.submitApp(wordCountJar)
- actualAppId shouldEqual -1
- }
-
- "reject an invalid submission (the jar file path is incorrect)" in {
- // exercise
- val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing")
- actualAppId shouldEqual -1
- }
- }
-
- "use `gear kill` to kill application" should {
- "a running application should be killed" in {
- // setup
- val appId = expectSubmitAppSuccess(wordCountJar)
-
- // exercise
- val success = commandLineClient.killApp(appId)
- success shouldBe true
- }
-
- "should fail when attempting to kill a stopped application" in {
- // setup
- val appId = expectSubmitAppSuccess(wordCountJar)
- var success = commandLineClient.killApp(appId)
- success shouldBe true
-
- // exercise
- success = commandLineClient.killApp(appId)
- success shouldBe false
- }
-
- "the EmbededCluster can be used as embedded cluster in process" in {
- // setup
- val args = "-debug true -sleep 10"
- val appId = expectSubmitAppSuccess(wordCountJar, args)
- var success = commandLineClient.killApp(appId)
- success shouldBe true
- }
-
- "should fail when attempting to kill a non-exist application" in {
- // setup
- val freeAppId = getNextAvailableAppId
-
- // exercise
- val success = commandLineClient.killApp(freeAppId)
- success shouldBe false
- }
- }
-
- "use `gear replay` to replay the application from current min clock" should {
- "todo: description" in {
- // todo: test code
- }
- }
-
- private def getRunningAppCount: Int = {
- commandLineClient.listRunningApps().length
- }
-
- private def getNextAvailableAppId: Int = {
- commandLineClient.listApps().length + 1
- }
-
- private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
- val appId = commandLineClient.submitApp(jar)
- appId should not equal -1
- appId
- }
-
- private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = {
- val actual = commandLineClient.queryApp(appId)
- actual should include(s"application: $appId, ")
- actual should include(s"name: $expectedName, ")
- actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
deleted file mode 100644
index d8bdc1e..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.scalatest.TestData
-
-import org.apache.gearpump.integrationtest.kafka._
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks the Kafka datasource connector
- */
-class ConnectorKafkaSpec extends TestSpecBase {
-
- private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
- private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head
- private var producer: NumericalDataProducer = null
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- kafkaCluster.start()
- }
-
- override def afterAll(): Unit = {
- kafkaCluster.shutDown()
- super.afterAll()
- }
-
- override def afterEach(test: TestData): Unit = {
- super.afterEach(test)
- if (producer != null) {
- producer.stop()
- producer = null
- }
- }
-
- "KafkaSource and KafkaSink" should {
- "read from and write to kafka" in {
- // setup
- val sourceTopic = "topic1"
- val sinkTopic = "topic2"
- val messageNum = 10000
- kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
-
- // exercise
- val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
- "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
- "-brokerList", kafkaCluster.getBrokerListConnectString,
- "-sourceTopic", sourceTopic,
- "-sinkTopic", sinkTopic).mkString(" ")
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
- success shouldBe true
-
- // verify
- expectAppIsRunning(appId, "KafkaReadWrite")
- Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum,
- "kafka all message written")
- }
- }
-
- "Gearpump with Kafka" should {
- "support at-least-once message delivery" in {
- // setup
- val sourcePartitionNum = 2
- val sourceTopic = "topic3"
- val sinkTopic = "topic4"
- // Generate number sequence (1, 2, 3, ...) to the topic
- kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
- producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString)
- producer.start()
-
- // exercise
- val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
- "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
- "-brokerList", kafkaCluster.getBrokerListConnectString,
- "-sourceTopic", sourceTopic,
- "-sinkTopic", sinkTopic,
- "-source", sourcePartitionNum).mkString(" ")
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
- success shouldBe true
-
- // verify #1
- expectAppIsRunning(appId, "KafkaReadWrite")
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-
- // verify #2
- val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
- restClient.killExecutor(appId, executorToKill) shouldBe true
- Util.retryUntil(() => restClient.queryExecutorBrief(appId)
- .map(_.executorId).max > executorToKill,
- s"executor $executorToKill killed")
-
- // verify #3
- val detector = new MessageLossDetector(producer.lastWriteNum)
- val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
- host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
- Util.retryUntil(() => {
- kafkaReader.read()
- detector.allReceived
- }, "kafka all message read")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
deleted file mode 100644
index 56b33c1..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-import org.apache.gearpump.metrics.Metrics.Meter
-import org.apache.gearpump.streaming._
-import org.apache.gearpump.streaming.appmaster.ProcessorSummary
-
-class DynamicDagSpec extends TestSpecBase {
-
- lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head
- val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split"
- val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum"
- val solName = "sol"
-
- "dynamic dag" should {
- "can retrieve a list of built-in partitioner classes" in {
- val partitioners = restClient.queryBuiltInPartitioners()
- partitioners.length should be > 0
- partitioners.foreach(clazz =>
- clazz should startWith("org.apache.gearpump.partitioner.")
- )
- }
-
- "can compose a wordcount application from scratch" in {
- // todo: blocked by #1450
- }
-
- "can replace down stream with wordcount's sum processor (new processor will have metrics)" in {
- // setup
- val appId = expectSolJarSubmittedWithAppId()
-
- // exercise
- val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 1, sumTaskClass)
- var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
- Util.retryUntil(() => {
- laterProcessors = restClient.queryStreamingAppDetail(appId).processors
- laterProcessors.size == formerProcessors.size + 1
- }, "new processor successfully added")
- processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
- }
-
- "can replace up stream with wordcount's split processor (new processor will have metrics)" in {
- // setup
- val appId = expectSolJarSubmittedWithAppId()
-
- // exercise
- val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 0, splitTaskClass)
- var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
- Util.retryUntil(() => {
- laterProcessors = restClient.queryStreamingAppDetail(appId).processors
- laterProcessors.size == formerProcessors.size + 1
- }, "new processor added")
- processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput")
- }
-
- "fall back to last dag version when replacing a processor failid" in {
- // setup
- val appId = expectSolJarSubmittedWithAppId()
-
- // exercise
- val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 1, sumTaskClass)
- var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
- Util.retryUntil(() => {
- laterProcessors = restClient.queryStreamingAppDetail(appId).processors
- laterProcessors.size == formerProcessors.size + 1
- }, "new processor added")
- processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
-
- val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake"
- replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
- Util.retryUntil(() => {
- val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors
- processorsAfterFailure.size == laterProcessors.size
- }, "new processor added")
- val currentClock = restClient.queryStreamingAppDetail(appId).clock
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock,
- "app clock is advancing")
- }
-
- "fall back to last dag version when AppMaster HA triggered" in {
- // setup
- val appId = expectSolJarSubmittedWithAppId()
-
- // exercise
- val formerAppMaster = restClient.queryApp(appId).appMasterPath
- val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- replaceProcessor(appId, 1, sumTaskClass)
- var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
- Util.retryUntil(() => {
- laterProcessors = restClient.queryStreamingAppDetail(appId).processors
- laterProcessors.size == formerProcessors.size + 1
- }, "new processor added")
- processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
-
- restClient.killAppMaster(appId) shouldBe true
- Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
- "new AppMaster created")
- val processors = restClient.queryStreamingAppDetail(appId).processors
- processors.size shouldEqual laterProcessors.size
- }
- }
-
- private def expectSolJarSubmittedWithAppId(): Int = {
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, solName)
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
- appId
- }
-
- private def replaceProcessor(
- appId: Int,
- formerProcessorId: Int,
- newTaskClass: String,
- newProcessorDescription: String = "",
- newParallelism: Int = 1): Unit = {
- val uploadedJar = restClient.uploadJar(wordCountJar)
- val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass,
- newParallelism, newProcessorDescription,
- jar = uploadedJar)
-
- // exercise
- val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
- success shouldBe true
- }
-
- private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = {
- Util.retryUntil(() => {
- val actual = restClient.queryStreamingAppMetrics(appId, current = false,
- path = "processor" + processorId)
- val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
- throughput.size should be > 0
- throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
- }, "new processor has message received")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
deleted file mode 100644
index 27e4665..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util}
-import org.apache.gearpump.streaming._
-import org.apache.gearpump.streaming.appmaster.ProcessorSummary
-
-/**
- * The test spec will perform destructive operations to check the stability
- */
-class ExampleSpec extends TestSpecBase {
-
- private val LOG = Logger.getLogger(getClass)
-
- "distributed shell" should {
- "execute commands on machines where its executors are running" in {
- val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head
- val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell"
- val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient"
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
- success shouldBe true
- expectAppIsRunning(appId, "DistributedShell")
- val args = Array(
- clientClass,
- "-appid", appId.toString,
- "-command", "hostname"
- )
-
- val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_))
-
- def verify(): Boolean = {
- val workerNum = cluster.getWorkerHosts.length
- val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
- workerNum, args.mkString(" ")).split("\n").
- filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
- expectedHostNames.forall(result.contains)
- }
-
- Util.retryUntil(() => verify(),
- s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}")
- }
- }
-
- "wordcount" should {
- val wordCountJarNamePrefix = "wordcount-"
- behave like streamingApplication(wordCountJarNamePrefix, wordCountName)
-
- "can submit immediately after killing a former one" in {
- // setup
- val formerAppId = restClient.getNextAvailableAppId()
- val formerSubmissionSuccess =
- restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- formerSubmissionSuccess shouldBe true
- expectAppIsRunning(formerAppId, wordCountName)
- Util.retryUntil(() =>
- restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running")
- restClient.killApp(formerAppId)
-
- // exercise
- val appId = formerAppId + 1
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
- }
- }
-
- "wordcount(java)" should {
- val wordCountJavaJarNamePrefix = "wordcountjava-"
- val wordCountJavaName = "wordcountJava"
- behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName)
- }
-
- "sol" should {
- val solJarNamePrefix = "sol-"
- val solName = "sol"
- behave like streamingApplication(solJarNamePrefix, solName)
- }
-
- "complexdag" should {
- val dynamicDagJarNamePrefix = "complexdag-"
- val dynamicDagName = "dag"
- behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName)
- }
-
- def streamingApplication(jarNamePrefix: String, appName: String): Unit = {
- lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head
-
- "can obtain application clock and the clock will keep changing" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, appName)
-
- // exercise
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted")
- val formerClock = restClient.queryStreamingAppDetail(appId).clock
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock,
- "app clock is advancing")
- }
-
- "can change the parallelism and description of a processor" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
- formerSubmissionSuccess shouldBe true
- expectAppIsRunning(appId, appName)
- val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
- val processor0 = formerProcessors.get(0).get
- val expectedProcessorId = formerProcessors.size
- val expectedParallelism = processor0.parallelism + 1
- val expectedDescription = processor0.description + "new"
- val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass,
- expectedParallelism, description = expectedDescription)
-
- // exercise
- val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
- success shouldBe true
- var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
- Util.retryUntil(() => {
- laterProcessors = restClient.queryStreamingAppDetail(appId).processors
- laterProcessors.size == formerProcessors.size + 1
- }, "new process added")
- val laterProcessor0 = laterProcessors.get(expectedProcessorId).get
- laterProcessor0.parallelism shouldEqual expectedParallelism
- laterProcessor0.description shouldEqual expectedDescription
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
deleted file mode 100644
index bb9982a..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._
-import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
-import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader}
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * Checks message delivery consistency, like at-least-once, and exactly-once.
- */
-class MessageDeliverySpec extends TestSpecBase {
-
- private val LOG = Logger.getLogger(getClass)
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- }
-
- "Gearpump" should {
- "support exactly-once message delivery" in {
- withKafkaCluster(cluster) { kafkaCluster =>
- // setup
- val sourcePartitionNum = 1
- val sourceTopic = "topic1"
- val sinkTopic = "topic2"
-
- // Generate number sequence (1, 2, 3, ...) to the topic
- kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-
- withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer =>
-
- withHadoopCluster { hadoopCluster =>
- // exercise
- val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp",
- "-defaultFS", hadoopCluster.getDefaultFS,
- "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
- "-brokerList", kafkaCluster.getBrokerListConnectString,
- "-sourceTopic", sourceTopic,
- "-sinkTopic", sinkTopic,
- "-sourceTask", sourcePartitionNum).mkString(" ")
- val appId = restClient.getNextAvailableAppId()
-
- val stateJar = cluster.queryBuiltInExampleJars("state-").head
- val success = restClient.submitApp(stateJar, executorNum = 1, args = args)
- success shouldBe true
-
- // verify #1
- expectAppIsRunning(appId, "MessageCount")
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0,
- "app is running")
-
- // wait for checkpoint to take place
- Thread.sleep(1000)
-
- LOG.info("Trigger message replay by kill and restart the executors")
- val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
- restClient.killExecutor(appId, executorToKill) shouldBe true
- Util.retryUntil(() => restClient.queryExecutorBrief(appId)
- .map(_.executorId).max > executorToKill, s"executor $executorToKill killed")
-
- producer.stop()
- val producedNumbers = producer.producedNumbers
- LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
- s", ${producedNumbers.end - 1}] have been written to Kafka")
-
- // verify #3
- val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
-
- assert(producedNumbers.size == kafkaSourceOffset,
- "produced message should match Kafka queue size")
-
- LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset)
-
- // The sink processor of this job (MessageCountApp) writes total message
- // count to Kafka Sink periodically (once every checkpoint interval).
- // The detector keep record of latest message count.
- val detector = new ResultVerifier {
- var latestMessageCount: Int = 0
- override def onNext(messageCount: Int): Unit = {
- this.latestMessageCount = messageCount
- }
- }
-
- val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
- host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
- Util.retryUntil(() => {
- kafkaReader.read()
- LOG.info(s"Received message count: ${detector.latestMessageCount}, " +
- s"expect: ${producedNumbers.size}")
- detector.latestMessageCount == producedNumbers.size
- }, "MessageCountApp calculated message count matches " +
- "expected in case of message replay")
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
deleted file mode 100644
index 2f5bb64..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import scala.concurrent.duration._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.master.MasterStatus
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks REST service usage
- */
-class RestServiceSpec extends TestSpecBase {
-
- "query system version" should {
- "retrieve the current version number" in {
- restClient.queryVersion() should not be empty
- }
- }
-
- "list applications" should {
- "retrieve 0 application after cluster just started" in {
- restClient.listRunningApps().length shouldEqual 0
- }
-
- "retrieve 1 application after the first application submission" in {
- // exercise
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
- restClient.listRunningApps().length shouldEqual 1
- }
- }
-
- "submit application (wordcount)" should {
- "find a running application after submission" in {
- // exercise
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
- }
-
- "reject a repeated submission request while the application is running" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
- cluster.getWorkerHosts.length)
- formerSubmissionSuccess shouldBe true
- expectAppIsRunning(appId, wordCountName)
-
- // exercise
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe false
- }
-
- "reject an invalid submission (the jar file path is incorrect)" in {
- // exercise
- val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length)
- success shouldBe false
- }
-
- "submit a wordcount application with 4 split and 3 sum processors and expect " +
- "parallelism of processors match the given number" in {
- // setup
- val splitNum = 4
- val sumNum = 3
- val appId = restClient.getNextAvailableAppId()
-
- // exercise
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
- s"-split $splitNum -sum $sumNum")
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
- val processors = restClient.queryStreamingAppDetail(appId).processors
- processors.size shouldEqual 2
- val splitProcessor = processors.get(0).get
- splitProcessor.parallelism shouldEqual splitNum
- val sumProcessor = processors.get(1).get
- sumProcessor.parallelism shouldEqual sumNum
- }
-
- "can obtain application metrics and the metrics will keep changing" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
-
- // exercise
- expectMetricsAvailable(
- restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty,
- "metrics available")
- val actual = restClient.queryStreamingAppMetrics(appId, current = true)
- actual.path shouldEqual s"app$appId.processor*"
- actual.metrics.foreach(metric => {
- metric.time should be > 0L
- metric.value should not be null
- })
- val formerMetricsDump = actual.metrics.toString()
-
- expectMetricsAvailable({
- val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics
- laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
- }, "metrics available")
- }
-
- "can obtain application corresponding executors' metrics and " +
- "the metrics will keep changing" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
-
- // exercise
- expectMetricsAvailable(
- restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty,
- "metrics available")
- val actual = restClient.queryExecutorMetrics(appId, current = true)
- actual.path shouldEqual s"app$appId.executor*"
- actual.metrics.foreach(metric => {
- metric.time should be > 0L
- metric.value should not be null
- })
- val formerMetricsDump = actual.metrics.toString()
-
- expectMetricsAvailable({
- val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics
- laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
- }, "metrics available")
- }
- }
-
- "kill application" should {
- "a running application should be killed" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- expectAppIsRunning(appId, wordCountName)
-
- // exercise
- killAppAndVerify(appId)
- }
-
- "should fail when attempting to kill a stopped application" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
- val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- submissionSucess shouldBe true
- expectAppIsRunning(appId, wordCountName)
- killAppAndVerify(appId)
-
- // exercise
- val success = restClient.killApp(appId)
- success shouldBe false
- }
-
- "should fail when attempting to kill a non-exist application" in {
- // setup
- val freeAppId = restClient.listApps().length + 1
-
- // exercise
- val success = restClient.killApp(freeAppId)
- success shouldBe false
- }
- }
-
- "cluster information" should {
- "retrieve 1 master for a non-HA cluster" in {
- // exercise
- val masterSummary = restClient.queryMaster()
- masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses
- masterSummary.aliveFor should be > 0L
- masterSummary.masterStatus shouldEqual MasterStatus.Synced
- }
-
- "retrieve the same number of workers as cluster has" in {
- // setup
- val expectedWorkersCount = cluster.getWorkerHosts.size
-
- // exercise
- var runningWorkers: Array[WorkerSummary] = Array.empty
- Util.retryUntil(() => {
- runningWorkers = restClient.listRunningWorkers()
- runningWorkers.length == expectedWorkersCount
- }, "all workers running")
- runningWorkers.foreach { worker =>
- worker.state shouldEqual MasterToAppMaster.AppMasterActive
- }
- }
-
- "find a newly added worker instance" in {
- // setup
- restartClusterRequired = true
- val formerWorkersCount = cluster.getWorkerHosts.length
- Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount,
- "all workers running")
- val workerName = "newWorker"
-
- // exercise
- cluster.addWorkerNode(workerName)
- Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount,
- "new worker added")
- cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
- restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
- }
-
- "retrieve 0 worker, if cluster is started without any workers" in {
- // setup
- restartClusterRequired = true
- cluster.shutDown()
-
- // exercise
- cluster.start(workerNum = 0)
- cluster.getWorkerHosts.length shouldEqual 0
- restClient.listRunningWorkers().length shouldEqual 0
- }
-
- "can obtain master's metrics and the metrics will keep changing" in {
- // exercise
- expectMetricsAvailable(
- restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available")
- val actual = restClient.queryMasterMetrics(current = true)
- actual.path shouldEqual s"master"
- actual.metrics.foreach(metric => {
- metric.time should be > 0L
- metric.value should not be null
- })
- val formerMetricsDump = actual.metrics.toString()
-
- expectMetricsAvailable({
- val laterMetrics = restClient.queryMasterMetrics(current = true).metrics
- laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
- }, "metrics available")
- }
-
- "can obtain workers' metrics and the metrics will keep changing" in {
- // exercise
- restClient.listRunningWorkers().foreach { worker =>
- val workerId = worker.workerId
- expectMetricsAvailable(
- restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty,
- "metrics available")
- val actual = restClient.queryWorkerMetrics(workerId, current = true)
- actual.path shouldEqual s"worker${WorkerId.render(workerId)}"
- actual.metrics.foreach(metric => {
- metric.time should be > 0L
- metric.value should not be null
- })
- val formerMetricsDump = actual.metrics.toString()
-
- expectMetricsAvailable({
- val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics
- laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump
- }, "metrics available")
- }
- }
- }
-
- "configuration" should {
- "retrieve the configuration of master and match particular values" in {
- // exercise
- val actual = restClient.queryMasterConfig()
- actual.hasPath("gearpump") shouldBe true
- actual.hasPath("gearpump.cluster") shouldBe true
- actual.getString("gearpump.hostname") shouldEqual cluster.getMasterHosts.mkString(",")
- }
-
- "retrieve the configuration of worker X and match particular values" in {
- // exercise
- restClient.listRunningWorkers().foreach { worker =>
- val actual = restClient.queryWorkerConfig(worker.workerId)
- actual.hasPath("gearpump") shouldBe true
- actual.hasPath("gearpump.worker") shouldBe true
- }
- }
-
- "retrieve the configuration of executor X and match particular values" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
-
- // exercise
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- restClient.queryExecutorBrief(appId).foreach { executor =>
- val executorId = executor.executorId
- val actual = restClient.queryExecutorConfig(appId, executorId)
- actual.hasPath("gearpump") shouldBe true
- actual.hasPath("gearpump.executor") shouldBe true
- actual.getInt("gearpump.applicationId") shouldEqual appId
- actual.getInt("gearpump.executorId") shouldEqual executorId
- }
- }
-
- "retrieve the configuration of application X and match particular values" in {
- // setup
- val appId = restClient.getNextAvailableAppId()
-
- // exercise
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
- success shouldBe true
- val actual = restClient.queryAppMasterConfig(appId)
- actual.hasPath("gearpump") shouldBe true
- actual.hasPath("gearpump.appmaster") shouldBe true
- }
- }
-
- "application life-cycle" should {
- "newly started application should be configured same as the previous one, after restart" in {
- // setup
- val originSplitNum = 4
- val originSumNum = 3
- val originAppId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length,
- s"-split $originSplitNum -sum $originSumNum")
- success shouldBe true
- expectAppIsRunning(originAppId, wordCountName)
- val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
-
- // exercise
- Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted")
- val killedApp = restClient.queryApp(originAppId)
- killedApp.appId shouldEqual originAppId
- killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
- val runningApps = restClient.listRunningApps()
- runningApps.length shouldEqual 1
- val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId)
- newAppDetail.appName shouldEqual originAppDetail.appName
- newAppDetail.processors.size shouldEqual originAppDetail.processors.size
- newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum
- newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum
- }
- }
-
- private def killAppAndVerify(appId: Int): Unit = {
- val success = restClient.killApp(appId)
- success shouldBe true
-
- val actualApp = restClient.queryApp(appId)
- actualApp.appId shouldEqual appId
- actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
- }
-
- private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = {
- val config = restClient.queryMasterConfig()
- val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
- Util.retryUntil(() => condition, conditionDescription, interval = reportInterval)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
deleted file mode 100644
index 4b15055..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import scala.concurrent.duration.Duration
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-import org.apache.gearpump.util.{Constants, LogUtil}
-
-/**
- * The test spec will perform destructive operations to check the stability. Operations
- * contains shutting-down appmaster, executor, or worker, and etc..
- */
-class StabilitySpec extends TestSpecBase {
-
- private val LOG = LogUtil.getLogger(getClass)
-
- "kill appmaster" should {
- "restart the whole application" in {
- // setup
- val appId = commandLineClient.submitApp(wordCountJar)
- val formerAppMaster = restClient.queryApp(appId).appMasterPath
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
- ensureClockStoredInMaster()
-
- // exercise
- restClient.killAppMaster(appId) shouldBe true
- // todo: how long master will begin to recover and how much time for the recovering?
- Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster,
- "appmaster killed and restarted")
-
- // verify
- val laterAppMaster = restClient.queryStreamingAppDetail(appId)
- laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
- laterAppMaster.clock should be > 0L
- }
- }
-
- "kill executor" should {
- "will create a new executor and application will replay from the latest application clock" in {
- // setup
- val appId = commandLineClient.submitApp(wordCountJar)
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
- val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
- ensureClockStoredInMaster()
-
- // exercise
- restClient.killExecutor(appId, executorToKill) shouldBe true
- // todo: how long appmaster will begin to recover and how much time for the recovering?
- Util.retryUntil(() => restClient.queryExecutorBrief(appId)
- .map(_.executorId).max > executorToKill,
- s"executor $executorToKill killed and restarted")
-
- // verify
- val laterAppMaster = restClient.queryStreamingAppDetail(appId)
- laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
- laterAppMaster.clock should be > 0L
- }
- }
-
- private def hostName(workerId: WorkerId): String = {
- val worker = restClient.listRunningWorkers().find(_.workerId == workerId)
- // Parse hostname from JVM info (in format: PID@hostname)
- val hostname = worker.get.jvmName.split("@")(1)
- hostname
- }
-
- "kill worker" should {
- "worker will not recover but all its executors will be migrated to other workers" in {
- // setup
- restartClusterRequired = true
- val appId = commandLineClient.submitApp(wordCountJar)
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-
- val allexecutors = restClient.queryExecutorBrief(appId)
- val maxExecutor = allexecutors.sortBy(_.executorId).last
- ensureClockStoredInMaster()
-
- val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
- LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
- s"worker: ${maxExecutor.workerId}")
- val executorsSharingSameWorker = allexecutors
- .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
- LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," +
- s" ${executorsSharingSameWorker.mkString(",")}")
-
- // kill the worker and expect restarting all killed executors on other workers.
- val workerIdToKill = maxExecutor.workerId
- cluster.removeWorkerNode(hostName(workerIdToKill))
-
- val appMasterKilled = executorsSharingSameWorker
- .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
- def executorsMigrated(): Boolean = {
- val executors = restClient.queryExecutorBrief(appId)
- val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
-
- if (appMasterKilled) {
- newAppMaster.get.workerId != appMaster.get.workerId
- } else {
- // New executors will be started to replace killed executors.
- // The new executors will be assigned larger executor Id. We use this trick to detect
- // Whether new executors have been started successfully.
- executors.map(_.executorId).max > maxExecutor.executorId
- }
- }
-
- Util.retryUntil(() => {
- executorsMigrated()
- }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed")
-
- // verify
- val laterAppMaster = restClient.queryStreamingAppDetail(appId)
- laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
- laterAppMaster.clock should be > 0L
- }
- }
-
- "kill master" should {
- "master will be down and all workers will attempt to reconnect and suicide after X seconds" in {
- // setup
- restartClusterRequired = true
- val masters = cluster.getMasterHosts
- val config = restClient.queryMasterConfig()
- val shutDownTimeout = Duration(config.getString("akka.cluster.auto-down-unreachable-after"))
-
- // exercise
- masters.foreach(cluster.removeMasterNode)
- info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers are down")
- Thread.sleep(shutDownTimeout.toMillis)
-
- // verify
- val aliveWorkers = cluster.getWorkerHosts
- Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)),
- "all workers down")
- }
- }
-
- private def ensureClockStoredInMaster(): Unit = {
- // TODO: 5000ms is a fixed sync period in clock service.
- // we wait for 5000ms to assume the clock is stored
- Thread.sleep(5000)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
deleted file mode 100644
index b327bf4..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.checklist
-
-import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader}
-import org.apache.gearpump.integrationtest.storm.StormClient
-import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
-
-/**
- * The test spec checks the compatibility of running Storm applications
- */
-class StormCompatibilitySpec extends TestSpecBase {
-
- private lazy val stormClient = {
- new StormClient(cluster, restClient)
- }
-
- val `version0.9` = "09"
- val `version0.10` = "010"
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- stormClient.start()
- }
-
- override def afterAll(): Unit = {
- stormClient.shutDown()
- super.afterAll()
- }
-
- def withStorm(testCode: String => Unit): Unit = {
- testCode(`version0.9`)
- testCode(`version0.10`)
- }
-
- def getTopologyName(name: String, stormVersion: String): String = {
- s"${name}_$stormVersion"
- }
-
- def getStormJar(stormVersion: String): String = {
- cluster.queryBuiltInITJars(s"storm$stormVersion-").head
- }
-
- "Storm over Gearpump" should withStorm {
- stormVersion =>
- s"support basic topologies ($stormVersion)" in {
- val stormJar = getStormJar(stormVersion)
- val topologyName = getTopologyName("exclamation", stormVersion)
-
- // exercise
- val appId = stormClient.submitStormApp(
- jar = stormJar,
- mainClass = "storm.starter.ExclamationTopology",
- args = topologyName,
- appName = topologyName)
-
- // verify
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
- }
-
- s"support to run a python version of wordcount ($stormVersion)" in {
- val stormJar = getStormJar(stormVersion)
- val topologyName = getTopologyName("wordcount", stormVersion)
-
- // exercise
- val appId = stormClient.submitStormApp(
- jar = stormJar,
- mainClass = "storm.starter.WordCountTopology",
- args = topologyName,
- appName = topologyName)
-
- // verify
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
- }
-
- s"support DRPC ($stormVersion)" in {
- // ReachTopology computes the Twitter url reached by users and their followers
- // using Storm Distributed RPC feature
- // input (user and follower) data are already prepared in memory
- val stormJar = getStormJar(stormVersion)
- val topologyName = getTopologyName("reach", stormVersion)
- stormClient.submitStormApp(
- jar = stormJar,
- mainClass = "storm.starter.ReachTopology",
- args = topologyName,
- appName = topologyName)
- val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
-
- // verify
- Util.retryUntil(() => {
- drpcClient.execute("reach", "notaurl.com") == "0"
- }, "drpc reach == 0")
- drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
- drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14"
- }
-
- s"support tick tuple ($stormVersion)" in {
- val stormJar = getStormJar(stormVersion)
- val topologyName = getTopologyName("slidingWindowCounts", stormVersion)
-
- // exercise
- val appId = stormClient.submitStormApp(
- jar = stormJar,
- mainClass = "storm.starter.RollingTopWords",
- args = s"$topologyName remote",
- appName = topologyName)
-
- // verify
- Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
- }
-
- s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in {
-
- val stormJar = getStormJar(stormVersion)
- val topologyName = getTopologyName("storm_kafka", stormVersion)
- val stormKafkaTopology =
- s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
-
- import org.apache.gearpump.integrationtest.kafka.KafkaCluster._
- withKafkaCluster(cluster) {
- kafkaCluster =>
- val sourcePartitionNum = 2
- val sinkPartitionNum = 1
- val zookeeper = kafkaCluster.getZookeeperConnectString
- val brokerList = kafkaCluster.getBrokerListConnectString
- val sourceTopic = "topic1"
- val sinkTopic = "topic2"
-
- val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic,
- "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList,
- "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum"
- )
-
- kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
-
- // generate number sequence (1, 2, 3, ...) to the topic
- withDataProducer(sourceTopic, brokerList) { producer =>
-
- val appId = stormClient.submitStormApp(
- jar = stormJar,
- mainClass = stormKafkaTopology,
- args = args.mkString(" "),
- appName = topologyName)
-
- Util.retryUntil(() =>
- restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
-
- // kill executor and verify at-least-once is guaranteed on application restart
- val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
- restClient.killExecutor(appId, executorToKill) shouldBe true
- Util.retryUntil(() =>
- restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill,
- s"executor $executorToKill killed")
-
- // verify no message loss
- val detector = new
- MessageLossDetector(producer.lastWriteNum)
- val kafkaReader = new
- SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost,
- port = kafkaCluster.advertisedPort)
-
- Util.retryUntil(() => {
- kafkaReader.read()
- detector.allReceived
- }, "all kafka message read")
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
deleted file mode 100644
index 7942308..0000000
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.suites
-
-import org.scalatest._
-
-import org.apache.gearpump.integrationtest.MiniClusterProvider
-import org.apache.gearpump.integrationtest.checklist._
-import org.apache.gearpump.integrationtest.minicluster.MiniCluster
-
-/**
- * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific
- * test spec, you need to comment out other lines.
- */
-class StandaloneModeSuite extends Suites(
- new CommandLineSpec,
- new RestServiceSpec,
- new ExampleSpec,
- new DynamicDagSpec,
- new StormCompatibilitySpec,
- new StabilitySpec,
- new ConnectorKafkaSpec,
- new MessageDeliverySpec
-) with BeforeAndAfterAll {
-
- override def beforeAll(): Unit = {
- super.beforeAll()
- MiniClusterProvider.managed = true
- MiniClusterProvider.set(new MiniCluster).start()
- }
-
- override def afterAll(): Unit = {
- MiniClusterProvider.get.shutDown()
- super.afterAll()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
new file mode 100644
index 0000000..4a161c7
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+
+/**
+ * Provides a min cluster of Gearpump, which contains one or more masters, and workers.
+ */
+object MiniClusterProvider {
+
+ private var instance = new MiniCluster
+
+ def get: MiniCluster = instance
+
+ def set(instance: MiniCluster): MiniCluster = {
+ this.instance = instance
+ instance
+ }
+
+ /**
+ * Indicates whether test suite should create particular cluster. In case of false, every
+ * test spec will be responsible for cluster creation.
+ */
+ var managed = false
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
new file mode 100644
index 0000000..dabcc71
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.scalatest._
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * The abstract test spec
+ */
+trait TestSpecBase
+ extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll {
+
+ private def LOGGER = LogUtil.getLogger(getClass)
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ if (!MiniClusterProvider.managed) {
+ LOGGER.info("Will test with a default standalone mini cluster")
+ MiniClusterProvider.get.start()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ if (!MiniClusterProvider.managed) {
+ LOGGER.info("Will shutdown the default mini cluster")
+ MiniClusterProvider.get.shutDown()
+ }
+ super.afterAll()
+ }
+
+ lazy val cluster = MiniClusterProvider.get
+ lazy val commandLineClient = cluster.commandLineClient
+ lazy val restClient = cluster.restClient
+
+ lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head
+ lazy val wordCountName = "wordCount"
+
+ var restartClusterRequired: Boolean = false
+
+ override def beforeEach(td: TestData): Unit = {
+
+ LOGGER.debug(s">### =============================================================")
+ LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
+
+ assert(cluster != null, "Configure MiniCluster properly in suite spec")
+ cluster.isAlive shouldBe true
+ restClient.listRunningApps().isEmpty shouldBe true
+ LOGGER.debug(s">### =============================================================")
+ LOGGER.debug(s">###2 Start test: ${td.name}\n")
+ }
+
+ override def afterEach(td: TestData): Unit = {
+ LOGGER.debug(s"<### =============================================================")
+ LOGGER.debug(s"<###3 End test: ${td.name}\n")
+
+ if (restartClusterRequired || !cluster.isAlive) {
+ restartClusterRequired = false
+ LOGGER.info("Will restart the cluster for next test case")
+ cluster.restart()
+ } else {
+ restClient.listRunningApps().foreach(app => {
+ commandLineClient.killApp(app.appId) shouldBe true
+ })
+ }
+ }
+
+ def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = {
+ val app = restClient.queryApp(appId)
+ app.status shouldEqual MasterToAppMaster.AppMasterActive
+ app.appName shouldEqual expectedAppName
+ app
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
new file mode 100644
index 0000000..eabc684
--- /dev/null
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.checklist
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.TestSpecBase
+
+/**
+ * The test spec checks the command-line usage
+ */
+class CommandLineSpec extends TestSpecBase {
+
+ "use `gear info` to list applications" should {
+ "retrieve 0 application after cluster just started" in {
+ // exercise
+ getRunningAppCount shouldEqual 0
+ }
+
+ "retrieve 1 application after the first application submission" in {
+ // setup
+ val appId = expectSubmitAppSuccess(wordCountJar)
+ expectAppIsRunningByParsingOutput(appId, wordCountName)
+
+ // exercise
+ getRunningAppCount shouldEqual 1
+ }
+ }
+
+ "use `gear app` to submit application" should {
+ "find a running application after submission" in {
+ // exercise
+ val appId = expectSubmitAppSuccess(wordCountJar)
+ expectAppIsRunningByParsingOutput(appId, wordCountName)
+ }
+
+ "reject a repeated submission request while the application is running" in {
+ // setup
+ val appId = expectSubmitAppSuccess(wordCountJar)
+ expectAppIsRunningByParsingOutput(appId, wordCountName)
+
+ // exercise
+ val actualAppId = commandLineClient.submitApp(wordCountJar)
+ actualAppId shouldEqual -1
+ }
+
+ "reject an invalid submission (the jar file path is incorrect)" in {
+ // exercise
+ val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing")
+ actualAppId shouldEqual -1
+ }
+ }
+
+ "use `gear kill` to kill application" should {
+ "a running application should be killed" in {
+ // setup
+ val appId = expectSubmitAppSuccess(wordCountJar)
+
+ // exercise
+ val success = commandLineClient.killApp(appId)
+ success shouldBe true
+ }
+
+ "should fail when attempting to kill a stopped application" in {
+ // setup
+ val appId = expectSubmitAppSuccess(wordCountJar)
+ var success = commandLineClient.killApp(appId)
+ success shouldBe true
+
+ // exercise
+ success = commandLineClient.killApp(appId)
+ success shouldBe false
+ }
+
+ "the EmbededCluster can be used as embedded cluster in process" in {
+ // setup
+ val args = "-debug true -sleep 10"
+ val appId = expectSubmitAppSuccess(wordCountJar, args)
+ var success = commandLineClient.killApp(appId)
+ success shouldBe true
+ }
+
+ "should fail when attempting to kill a non-exist application" in {
+ // setup
+ val freeAppId = getNextAvailableAppId
+
+ // exercise
+ val success = commandLineClient.killApp(freeAppId)
+ success shouldBe false
+ }
+ }
+
+ "use `gear replay` to replay the application from current min clock" should {
+ "todo: description" in {
+ // todo: test code
+ }
+ }
+
+ private def getRunningAppCount: Int = {
+ commandLineClient.listRunningApps().length
+ }
+
+ private def getNextAvailableAppId: Int = {
+ commandLineClient.listApps().length + 1
+ }
+
+ private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
+ val appId = commandLineClient.submitApp(jar)
+ appId should not equal -1
+ appId
+ }
+
+ private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = {
+ val actual = commandLineClient.queryApp(appId)
+ actual should include(s"application: $appId, ")
+ actual should include(s"name: $expectedName, ")
+ actual should include(s"status: ${MasterToAppMaster.AppMasterActive}")
+ }
+}
[2/4] incubator-gearpump git commit: fix GEARPUMP-150 correct the
integration test file structure
Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
deleted file mode 100644
index 73413da..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.sys.process._
-
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A helper to instantiate the base image for different usage.
- */
-class BaseContainer(val host: String, command: String,
- masterAddrs: List[(String, Int)],
- tunnelPorts: Set[Int] = Set.empty) {
-
- private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
- private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
- private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
- private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
- private val HOST_LOG_HOME = {
- val dir = "/tmp/gearpump"
- s"mkdir -p $dir".!!
- s"mktemp -p $dir -d".!!.trim
- }
-
- private val CLUSTER_OPTS = {
- masterAddrs.zipWithIndex.map { case (hostPort, index) =>
- s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
- }.mkString(" ")
- }
-
- def createAndStart(): String = {
- Docker.createAndStartContainer(host, IMAGE_NAME, command,
- environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
- volumes = Map(
- HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
- HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
- knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
- tunnelPorts = tunnelPorts)
- }
-
- def killAndRemove(): Unit = {
- Docker.killAndRemoveContainer(host)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
deleted file mode 100644
index 884a8d1..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.Docker
-
-/**
- * A command-line client to operate a Gearpump cluster
- */
-class CommandLineClient(host: String) {
-
- private val LOG = Logger.getLogger(getClass)
-
- def listApps(): Array[String] = {
- gearCommand(host, "gear info").split("\n").filter(
- _.startsWith("application: ")
- )
- }
-
- def listRunningApps(): Array[String] =
- listApps().filter(
- _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
- )
-
- def queryApp(appId: Int): String = try {
- listApps().filter(
- _.startsWith(s"application: $appId")
- ).head
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- ""
- }
-
- def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
- gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
- }
-
- def submitApp(jar: String, args: String = ""): Int = {
- LOG.debug(s"|=> Submit Application $jar...")
- submitAppUse("gear app", jar, args)
- }
-
- private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
- gearCommand(host, s"$launcher -jar $jar $args").split("\n")
- .filter(_.contains("The application id is ")).head.split(" ").last.toInt
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- -1
- }
-
- def killApp(appId: Int): Boolean = {
- tryGearCommand(host, s"gear kill -appid $appId")
- }
-
- private def gearCommand(container: String, command: String): String = {
- LOG.debug(s"|=> Gear command $command in container $container...")
- Docker.execute(container, s"/opt/start $command")
- }
-
- private def tryGearCommand(container: String, command: String): Boolean = {
- LOG.debug(s"|=> Gear command $command in container $container...")
- Docker.executeSilently(container, s"/opt/start $command")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
deleted file mode 100644
index 4d439e8..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import java.io.IOException
-import scala.collection.mutable.ListBuffer
-
-import org.apache.log4j.Logger
-
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-/**
- * This class is a test driver for end-to-end integration test.
- */
-class MiniCluster {
-
- private val LOG = Logger.getLogger(getClass)
- private val SUT_HOME = "/opt/gearpump"
-
- private val REST_SERVICE_PORT = 8090
- private val MASTER_PORT = 3000
- private val MASTER_ADDRS: List[(String, Int)] = {
- (0 to 0).map(index =>
- ("master" + index, MASTER_PORT)
- ).toList
- }
-
- lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
-
- lazy val restClient: RestClient = {
- val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
- client
- }
-
- private var workers: ListBuffer[String] = ListBuffer.empty
-
- def start(workerNum: Int = 2): Unit = {
-
- // Kill master
- MASTER_ADDRS.foreach { case (host, _) =>
- if (Docker.containerExists(host)) {
- Docker.killAndRemoveContainer(host)
- }
- }
-
- // Kill existing workers
- workers ++= (0 until workerNum).map("worker" + _)
- workers.foreach { worker =>
- if (Docker.containerExists(worker)) {
- Docker.killAndRemoveContainer(worker)
- }
- }
-
- // Start Masters
- MASTER_ADDRS.foreach({ case (host, port) =>
- addMasterNode(host, port)
- })
-
- // Start Workers
- workers.foreach { worker =>
- val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
- container.createAndStart()
- }
-
- // Check cluster status
- expectRestClientAuthenticated()
- expectClusterAvailable()
- }
-
- private def addMasterNode(host: String, port: Int): Unit = {
- val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
- container.createAndStart()
- }
-
- def addWorkerNode(host: String): Unit = {
- if (workers.find(_ == host).isEmpty) {
- val container = new BaseContainer(host, "worker", MASTER_ADDRS)
- container.createAndStart()
- workers += host
- } else {
- throw new IOException(s"Cannot add new worker $host, " +
- s"as worker with same hostname already exists")
- }
- }
-
- /**
- * @throws RuntimeException if rest client is not authenticated after N attempts
- */
- private def expectRestClientAuthenticated(): Unit = {
- Util.retryUntil(() => {
- restClient.login()
- LOG.info("rest client has been authenticated")
- true
- }, "login successfully")
- }
-
- /**
- * @throws RuntimeException if service is not available after N attempts
- */
- private def expectClusterAvailable(): Unit = {
- Util.retryUntil(() => {
- val response = restClient.queryMaster()
- LOG.info(s"cluster is now available with response: $response.")
- response.aliveFor > 0
- }, "cluster running")
- }
-
- def isAlive: Boolean = {
- getMasterHosts.exists(nodeIsOnline)
- }
-
- def getNetworkGateway: String = {
- Docker.getNetworkGateway(MASTER_ADDRS.head._1)
- }
-
- def shutDown(): Unit = {
- val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
- .filter(nodeIsOnline).toArray
- if (removalHosts.length > 0) {
- Docker.killAndRemoveContainer(removalHosts)
- }
- workers.clear()
- }
-
- def removeMasterNode(host: String): Unit = {
- Docker.killAndRemoveContainer(host)
- }
-
- def removeWorkerNode(host: String): Unit = {
- workers -= host
- Docker.killAndRemoveContainer(host)
- }
-
- def restart(): Unit = {
- shutDown()
- Util.retryUntil(() => {
- !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
- }, "all docker containers killed")
- LOG.info("all docker containers have been killed. restarting...")
- start()
- }
-
- def getMastersAddresses: List[(String, Int)] = {
- MASTER_ADDRS
- }
-
- def getMasterHosts: List[String] = {
- MASTER_ADDRS.map({ case (host, port) => host })
- }
-
- def getWorkerHosts: List[String] = {
- workers.toList
- }
-
- def nodeIsOnline(host: String): Boolean = {
- Docker.containerIsRunning(host)
- }
-
- private def builtInJarsUnder(folder: String): Array[String] = {
- Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
- }
-
- private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
- builtInJarsUnder(folder).filter(_.contains(subtext))
- }
-
- def queryBuiltInExampleJars(subtext: String): Seq[String] = {
- queryBuiltInJars("examples", subtext)
- }
-
- def queryBuiltInITJars(subtext: String): Seq[String] = {
- queryBuiltInJars("integrationtest", subtext)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
deleted file mode 100644
index 1b143af..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.minicluster
-
-import scala.reflect.ClassTag
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.log4j.Logger
-import upickle.Js
-import upickle.default._
-
-import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
-import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
-import org.apache.gearpump.cluster.master.MasterSummary
-import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-import org.apache.gearpump.services.AppMasterService.Status
-import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
-// NOTE: This cannot be removed!!!
-import org.apache.gearpump.services.util.UpickleUtil._
-import org.apache.gearpump.streaming.ProcessorDescription
-import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
-import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
-import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
-import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
-import org.apache.gearpump.util.{Constants, Graph}
-
-/**
- * A REST client to operate a Gearpump cluster
- */
-class RestClient(host: String, port: Int) {
-
- private val LOG = Logger.getLogger(getClass)
-
- private val cookieFile: String = "cookie.txt"
-
- implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
- upickle.default.Reader[Graph[Int, String]] {
- case Js.Obj(verties, edges) =>
- val vertexList = upickle.default.readJs[List[Int]](verties._2)
- val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
- Graph(vertexList, edgeList)
- }
-
- private def decodeAs[T](
- expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
- read[T](expr)
- } catch {
- case ex: Throwable =>
- LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
- throw ex
- }
-
- def queryVersion(): String = {
- curl("version")
- }
-
- def listWorkers(): Array[WorkerSummary] = {
- val resp = callApi("master/workerlist")
- decodeAs[List[WorkerSummary]](resp).toArray
- }
-
- def listRunningWorkers(): Array[WorkerSummary] = {
- listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
- }
-
- def listApps(): Array[AppMasterData] = {
- val resp = callApi("master/applist")
- decodeAs[AppMastersData](resp).appMasters.toArray
- }
-
- def listRunningApps(): Array[AppMasterData] = {
- listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
- }
-
- def getNextAvailableAppId(): Int = {
- listApps().length + 1
- }
-
- def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
- : Boolean = try {
- var endpoint = "master/submitapp"
-
- var options = Seq(s"jar=@$jar")
- if (config.length > 0) {
- options :+= s"conf=@$config"
- }
-
- options :+= s"executorcount=$executorNum"
-
- if (args != null && !args.isEmpty) {
- options :+= "args=\"" + args + "\""
- }
-
- val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
- val result = decodeAs[AppSubmissionResult](resp)
- assert(result.success)
- true
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def queryApp(appId: Int): AppMasterData = {
- val resp = callApi(s"appmaster/$appId")
- decodeAs[AppMasterData](resp)
- }
-
- def queryAppMasterConfig(appId: Int): Config = {
- val resp = callApi(s"appmaster/$appId/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
- val resp = callApi(s"appmaster/$appId?detail=true")
- decodeAs[StreamAppMasterSummary](resp)
- }
-
- def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
- : HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
- val resp = callApi(s"appmaster/$appId/executor/$executorId")
- decodeAs[ExecutorSummary](resp)
- }
-
- def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
- queryStreamingAppDetail(appId).executors.toArray
- }
-
- def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryExecutorConfig(appId: Int, executorId: Int): Config = {
- val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryMaster(): MasterSummary = {
- val resp = callApi("master")
- decodeAs[MasterData](resp).masterDescription
- }
-
- def queryMasterMetrics(current: Boolean): HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val resp = callApi(s"master/metrics/master?$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryMasterConfig(): Config = {
- val resp = callApi("master/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
- val args = if (current) "?readLatest=true" else ""
- val workerIdStr = WorkerId.render(workerId)
- val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
- decodeAs[HistoryMetrics](resp)
- }
-
- def queryWorkerConfig(workerId: WorkerId): Config = {
- val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
- ConfigFactory.parseString(resp)
- }
-
- def queryBuiltInPartitioners(): Array[String] = {
- val resp = callApi("master/partitioners")
- decodeAs[BuiltinPartitioners](resp).partitioners
- }
-
- def uploadJar(localFilePath: String): AppJar = {
- val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
- decodeAs[AppJar](resp)
- }
-
- def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
- replaceStreamingAppProcessor(appId, replaceMe, false)
- }
-
- def replaceStreamingAppProcessor(
- appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
- val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
- val args = upickle.default.write(replaceOperation)
- val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
- CRUD_POST)
- decodeAs[DAGOperationResult](resp)
- true
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def killAppMaster(appId: Int): Boolean = {
- killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
- }
-
- def killExecutor(appId: Int, executorId: Int): Boolean = try {
- val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
- val pid = jvmInfo(0).toInt
- val hostname = jvmInfo(1)
- Docker.killProcess(hostname, pid)
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def killApp(appId: Int): Boolean = try {
- val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
- resp.contains("\"status\":\"success\"")
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- def restartApp(appId: Int): Boolean = try {
- val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
- decodeAs[Status](resp).success
- } catch {
- case ex: Throwable =>
- LOG.warn(s"swallowed an exception: $ex")
- false
- }
-
- private val CRUD_POST = "-X POST"
- private val CRUD_DELETE = "-X DELETE"
-
- private def callApi(endpoint: String, option: String = ""): String = {
- curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
- }
-
- private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
- Docker.curl(host, s"http://$host:$port/$endpoint", options)
- }
-
- def login(): Unit = {
- curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
- "--data username=admin", "--data password=admin"))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
deleted file mode 100644
index 79adfc4..0000000
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.integrationtest.storm
-
-import scala.util.Random
-
-import backtype.storm.utils.{DRPCClient, Utils}
-
-import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
-import org.apache.gearpump.integrationtest.{Docker, Util}
-
-class StormClient(cluster: MiniCluster, restClient: RestClient) {
-
- private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
- private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
- private val DRPC_HOST = "storm0"
- private val DRPC_PORT = 3772
- private val DRPC_INVOCATIONS_PORT = 3773
- private val STORM_DRPC = "storm-drpc"
- private val NIMBUS_HOST = "storm1"
- private val STORM_NIMBUS = "storm nimbus"
- private val STORM_APP = "/opt/start storm app"
-
- private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
- tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
-
- private val nimbusContainer =
- new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
-
- def start(): Unit = {
- nimbusContainer.createAndStart()
- ensureNimbusRunning()
-
- drpcContainer.createAndStart()
- ensureDrpcServerRunning()
- }
-
- private def ensureNimbusRunning(): Unit = {
- Util.retryUntil(() => {
- val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
- // Parse format nimbus.thrift.port: '39322'
- val thriftPort = response.split(" ")(1).replace("'", "").toInt
-
- Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
- }, "Nimbus running")
- }
-
- private def ensureDrpcServerRunning(): Unit = {
- Util.retryUntil(() => {
- Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
- }, "DRPC running")
- }
-
- def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
- Util.retryUntil(() => {
- Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
- s"-jar $jar $mainClass $args")
- restClient.listRunningApps().exists(_.appName == appName)
- }, "app running")
- restClient.listRunningApps().filter(_.appName == appName).head.appId
- }
-
- def getDRPCClient(drpcServerIp: String): DRPCClient = {
- val config = Utils.readDefaultConfig()
- new DRPCClient(config, drpcServerIp, DRPC_PORT)
- }
-
- def shutDown(): Unit = {
-
- // Cleans up the storm.yaml config file
- Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
- drpcContainer.killAndRemove()
- nimbusContainer.killAndRemove()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
new file mode 100644
index 0000000..f315ad3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Docker.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import org.apache.log4j.Logger
+
+/**
+ * The class is used to execute Docker commands.
+ */
+object Docker {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ /**
+ * @throws RuntimeException in case retval != 0
+ */
+ private def doExecute(container: String, command: String): String = {
+ ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container")
+ }
+
+ private def doExecuteSilently(container: String, command: String): Boolean = {
+ ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
+ }
+
+ /**
+ * @throws RuntimeException in case retval != 0
+ */
+ final def execute(container: String, command: String): String = {
+ trace(container, s"Execute $command") {
+ doExecute(container, command)
+ }
+ }
+
+ final def executeSilently(container: String, command: String): Boolean = {
+ trace(container, s"Execute silently $command") {
+ doExecuteSilently(container, command)
+ }
+ }
+
+ final def listContainers(): Seq[String] = {
+ trace("", s"Listing how many containers...") {
+ ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST")
+ .split("\n").filter(_.nonEmpty)
+ }
+ }
+
+ final def containerIsRunning(name: String): Boolean = {
+ trace(name, s"Check container running or not...") {
+ ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty
+ }
+ }
+
+ final def getContainerIPAddr(name: String): String = {
+ trace(name, s"Get Ip Address") {
+ Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}")
+ }
+ }
+
+ final def containerExists(name: String): Boolean = {
+ trace(name, s"Check container existing or not...") {
+ ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty
+ }
+ }
+
+ /**
+ * @throws RuntimeException in case particular container is created already
+ */
+ final def createAndStartContainer(name: String, image: String, command: String,
+ environ: Map[String, String] = Map.empty, // key, value
+ volumes: Map[String, String] = Map.empty, // from, to
+ knownHosts: Set[String] = Set.empty,
+ tunnelPorts: Set[Int] = Set.empty): String = {
+
+ if (containerExists(name)) {
+ killAndRemoveContainer(name)
+ }
+
+ trace(name, s"Create and start $name ($image)...") {
+
+ val optsBuilder = new StringBuilder
+ optsBuilder.append("-d") // run in background
+ optsBuilder.append(" -h " + name) // use container name as hostname
+ optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings
+
+ environ.foreach { case (key, value) =>
+ optsBuilder.append(s" -e $key=$value")
+ }
+ volumes.foreach { case (from, to) =>
+ optsBuilder.append(s" -v $from:$to")
+ }
+ knownHosts.foreach(host =>
+ optsBuilder.append(" --link " + host)
+ )
+ tunnelPorts.foreach(port =>
+ optsBuilder.append(s" -p $port:$port")
+ )
+ createAndStartContainer(name, optsBuilder.toString(), command, image)
+ }
+ }
+
+ /**
+ * @throws RuntimeException in case particular container is created already
+ */
+ private def createAndStartContainer(
+ name: String, options: String, command: String, image: String): String = {
+ ShellExec.execAndCaptureOutput(s"docker run $options " +
+ s"--name $name $image $command", s"MAKE $name")
+ }
+
+ final def killAndRemoveContainer(name: String): Boolean = {
+ trace(name, s"kill and remove container") {
+ ShellExec.exec(s"docker rm -f $name", s"STOP $name")
+ }
+ }
+
+ final def killAndRemoveContainer(names: Array[String]): Boolean = {
+ assert(names.length > 0)
+ val args = names.mkString(" ")
+ trace(names.mkString(","), s"kill and remove containers") {
+ ShellExec.exec(s"docker rm -f $args", s"STOP $args.")
+ }
+ }
+
+ private def inspect(container: String, option: String): String = {
+ ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container")
+ }
+
+ final def curl(container: String, url: String, options: Array[String] = Array.empty[String])
+ : String = {
+ trace(container, s"curl $url") {
+ doExecute(container, s"curl -s ${options.mkString(" ")} $url")
+ }
+ }
+
+ final def getHostName(container: String): String = {
+ trace(container, s"Get hostname of container...") {
+ doExecute(container, "hostname")
+ }
+ }
+
+ final def getNetworkGateway(container: String): String = {
+ trace(container, s"Get gateway of container...") {
+ doExecute(container, "ip route").split("\\s+")(2)
+ }
+ }
+ final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = {
+ trace(container, s"Kill process pid: $pid") {
+ doExecuteSilently(container, s"kill -$signal $pid")
+ }
+ }
+
+ final def findJars(container: String, folder: String): Array[String] = {
+ trace(container, s"Find jars under $folder") {
+ doExecute(container, s"find $folder")
+ .split("\n").filter(_.endsWith(".jar"))
+ }
+ }
+
+ private def trace[T](container: String, msg: String)(fun: => T): T = {
+ // scalastyle:off println
+ Console.println() // A empty line to let the output looks better.
+ // scalastyle:on println
+ LOG.debug(s"Container $container====>> $msg")
+ LOG.debug("INPUT==>>")
+ val response = fun
+ LOG.debug("<<==OUTPUT")
+
+ LOG.debug(brief(response))
+
+ LOG.debug(s"<<====Command END. Container $container, $msg \n")
+ response
+ }
+
+ private val PREVIEW_MAX_LENGTH = 1024
+
+ private def brief[T](input: T): String = {
+ val output = input match {
+ case true =>
+ "Success|True"
+ case false =>
+ "Failure|False"
+ case x: Array[Any] =>
+ "Success: [" + x.mkString(",") + "]"
+ case x =>
+ x.toString
+ }
+
+ val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+ output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+ }
+ else {
+ output
+ }
+ preview
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
new file mode 100644
index 0000000..25d7ee3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/ShellExec.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.sys.process._
+
+import org.apache.log4j.Logger
+import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
+
+/**
+ * The class is used to execute command in a shell
+ */
+object ShellExec {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val PROCESS_TIMEOUT = 2.minutes
+
+ /**
+ * The builtin command line parser by ProcessBuilder (implicit sys.process) don't
+ * respect the quote chars (' and ")
+ */
+ private def splitQuotedString(str: String): List[String] = {
+ val splitter = new QuotedStringTokenizer(str, " \t\n\r")
+ splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
+ }
+
+ def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = {
+ LOG.debug(s"$sender => `$command`")
+
+ val p = splitQuotedString(command).run()
+ val f = Future(blocking(p.exitValue())) // wrap in Future
+ val retval = {
+ try {
+ Await.result(f, timeout)
+ } catch {
+ case _: TimeoutException =>
+ LOG.error(s"timeout to execute command `$command`")
+ p.destroy()
+ p.exitValue()
+ }
+ }
+ LOG.debug(s"$sender <= exit $retval")
+ retval == 0
+ }
+
+ def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT)
+ : String = {
+ LOG.debug(s"$sender => `$command`")
+
+ val buf = new StringBuilder
+ val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"),
+ (e: String) => buf.append(e).append("\n"))
+ val p = splitQuotedString(command).run(processLogger)
+ val f = Future(blocking(p.exitValue())) // wrap in Future
+ val retval = {
+ try {
+ Await.result(f, timeout)
+ } catch {
+ case _: TimeoutException =>
+ p.destroy()
+ p.exitValue()
+ }
+ }
+ val output = buf.toString().trim
+ val PREVIEW_MAX_LENGTH = 200
+ val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+ output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+ } else {
+ output
+ }
+
+ LOG.debug(s"$sender <= `$preview` exit $retval")
+ if (retval != 0) {
+ throw new RuntimeException(
+ s"exited ($retval) by executing `$command`")
+ }
+ output
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
new file mode 100644
index 0000000..7e7085d
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/Util.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest
+
+import scala.concurrent.duration._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.log4j.Logger
+
+object Util {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ def encodeUriComponent(s: String): String = {
+ try {
+ java.net.URLEncoder.encode(s, "UTF-8")
+ .replaceAll("\\+", "%20")
+ .replaceAll("\\%21", "!")
+ .replaceAll("\\%27", "'")
+ .replaceAll("\\%28", "(")
+ .replaceAll("\\%29", ")")
+ .replaceAll("\\%7E", "~")
+ } catch {
+ case ex: Throwable => s
+ }
+ }
+
+ def retryUntil(
+ condition: () => Boolean, conditionDescription: String, maxTries: Int = 15,
+ interval: Duration = 10.seconds): Unit = {
+ var met = false
+ var tries = 0
+
+ while (!met && tries < maxTries) {
+
+ met = Try(condition()) match {
+ case Success(true) => true
+ case Success(false) => false
+ case Failure(ex) => false
+ }
+
+ tries += 1
+
+ if (!met) {
+ LOG.error(s"Failed due to (false == $conditionDescription), " +
+ s"retrying for the ${tries} times...")
+ Thread.sleep(interval.toMillis)
+ } else {
+ LOG.info(s"Success ($conditionDescription) after ${tries} retries")
+ }
+ }
+
+ if (!met) {
+ throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
new file mode 100644
index 0000000..f836abd
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/hadoop/HadoopCluster.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.hadoop
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object HadoopCluster {
+
+ /** Starts a Hadoop cluster */
+ def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
+ val hadoopCluster = new HadoopCluster
+ try {
+ hadoopCluster.start()
+ testCode(hadoopCluster)
+ } finally {
+ hadoopCluster.shutDown()
+ }
+ }
+}
+/**
+ * This class maintains a single node Hadoop cluster
+ */
+class HadoopCluster {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
+ private val HADOOP_HOST = "hadoop0"
+
+ def start(): Unit = {
+ Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
+
+ Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
+ LOG.info("Hadoop cluster is started.")
+ }
+
+ // Checks whether the cluster is alive by listing "/"
+ private def isAlive: Boolean = {
+ Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /")
+ }
+
+ def getDefaultFS: String = {
+ val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
+ s"hdfs://$hostIPAddr:9000"
+ }
+
+ def shutDown(): Unit = {
+ Docker.killAndRemoveContainer(HADOOP_HOST)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
new file mode 100644
index 0000000..15ba084
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/KafkaCluster.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.minicluster.MiniCluster
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+object KafkaCluster {
+
+ /** Starts a Kafka cluster */
+ def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = {
+ val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
+ try {
+ kafkaCluster.start()
+ testCode(kafkaCluster)
+ } finally {
+ kafkaCluster.shutDown()
+ }
+ }
+
+ def withDataProducer(topic: String, brokerList: String)
+ (testCode: NumericalDataProducer => Unit): Unit = {
+ val producer = new NumericalDataProducer(topic, brokerList)
+ try {
+ producer.start()
+ testCode(producer)
+ } finally {
+ producer.stop()
+ }
+ }
+}
+
+/**
+ * This class maintains a single node Kafka cluster with integrated Zookeeper.
+ */
+class KafkaCluster(val advertisedHost: String, zkChroot: String = "") {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val KAFKA_DOCKER_IMAGE = "spotify/kafka"
+ private val KAFKA_HOST = "kafka0"
+ private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/"
+ private val ZOOKEEPER_PORT = 2181
+ private val BROKER_PORT = 9092
+ val advertisedPort = BROKER_PORT
+
+ def start(): Unit = {
+ Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "",
+ environ = Map(
+ "ADVERTISED_HOST" -> advertisedHost,
+ "ADVERTISED_PORT" -> BROKER_PORT.toString,
+ "ZK_CHROOT" -> zkChroot),
+ tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
+ )
+ Util.retryUntil(() => isAlive, "kafka cluster is alive")
+ LOG.debug("kafka cluster is started.")
+ }
+
+ def isAlive: Boolean = {
+ !listTopics().contains("Connection refused")
+ }
+
+ def shutDown(): Unit = {
+ Docker.killAndRemoveContainer(KAFKA_HOST)
+ }
+
+ private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST)
+
+ def listTopics(): String = {
+ kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString)
+ }
+
+ def getZookeeperConnectString: String = {
+ s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot"
+ }
+
+ def getBrokerListConnectString: String = {
+ s"$hostIPAddr:$BROKER_PORT"
+ }
+
+ def createTopic(topic: String, partitions: Int = 1): Unit = {
+ LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions")
+
+ Docker.executeSilently(KAFKA_HOST,
+ s"$KAFKA_HOME/bin/kafka-topics.sh" +
+ s" --zookeeper $getZookeeperConnectString" +
+ s" --create --topic $topic --partitions $partitions --replication-factor 1")
+ }
+
+ def produceDataToKafka(topic: String, messageNum: Int): Unit = {
+ Docker.executeSilently(KAFKA_HOST,
+ s"$KAFKA_HOME/bin/kafka-topics.sh" +
+ s" --zookeeper $getZookeeperConnectString" +
+ s" --create --topic $topic --partitions 1 --replication-factor 1")
+
+ Docker.executeSilently(KAFKA_HOST,
+ s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" +
+ s" --broker-list $getBrokerListConnectString" +
+ s" --topic $topic --messages $messageNum")
+ }
+
+ def getLatestOffset(topic: String): Int = {
+ kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString)
+ }
+
+ private def kafkaListTopics(
+ container: String, kafkaHome: String, zookeeperConnectionString: String): String = {
+
+ LOG.debug(s"|=> Kafka list topics...")
+ Docker.execute(container,
+ s"$kafkaHome/bin/kafka-topics.sh" +
+ s" --zookeeper $zookeeperConnectionString -list")
+ }
+
+ private def kafkaFetchLatestOffset(
+ container: String, topic: String, kafkaHome: String, brokersList: String): Int = {
+ LOG.debug(s"|=> Get latest offset of topic $topic...")
+ val output = Docker.execute(container,
+ s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +
+ s" --broker-list $brokersList " +
+ s" --topic $topic --time -1")
+ output.split(":")(2).toInt
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
new file mode 100644
index 0000000..1cf3125
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/NumericalDataProducer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import java.util.Properties
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class NumericalDataProducer(topic: String, bootstrapServers: String) {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val producer = createProducer
+ private val WRITE_SLEEP_NANOS = 10
+ private val serializer = new ChillSerializer[Int]
+ var lastWriteNum = 0
+
+ def start(): Unit = {
+ produceThread.start()
+ }
+
+ def stop(): Unit = {
+ if (produceThread.isAlive) {
+ produceThread.interrupt()
+ produceThread.join()
+ }
+ producer.close()
+ }
+
+ /** How many message we have written in total */
+ def producedNumbers: Range = {
+ Range(1, lastWriteNum + 1)
+ }
+
+ private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
+ val properties = new Properties()
+ properties.setProperty("bootstrap.servers", bootstrapServers)
+ new KafkaProducer[Array[Byte], Array[Byte]](properties,
+ new ByteArraySerializer, new ByteArraySerializer)
+ }
+
+ private val produceThread = new Thread(new Runnable {
+ override def run(): Unit = {
+ try {
+ while (!Thread.currentThread.isInterrupted) {
+ lastWriteNum += 1
+ val msg = serializer.serialize(lastWriteNum)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
+ producer.send(record)
+ Thread.sleep(0, WRITE_SLEEP_NANOS)
+ }
+ } catch {
+ case ex: InterruptedException =>
+ LOG.error("message producing is stopped by an interrupt")
+ }
+ }
+ })
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
new file mode 100644
index 0000000..1f773d3
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/ResultVerifier.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.collection.mutable
+
+trait ResultVerifier {
+ def onNext(num: Int): Unit
+}
+
+class MessageLossDetector(totalNum: Int) extends ResultVerifier {
+ private val bitSets = new mutable.BitSet(totalNum)
+ var result = List.empty[Int]
+
+ override def onNext(num: Int): Unit = {
+ bitSets.add(num)
+ result :+= num
+ }
+
+ def allReceived: Boolean = {
+ 1.to(totalNum).forall(bitSets)
+ }
+
+ def received(num: Int): Boolean = {
+ bitSets(num)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
new file mode 100644
index 0000000..392ca86
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.kafka
+
+import scala.util.{Failure, Success}
+
+import kafka.api.FetchRequestBuilder
+import kafka.consumer.SimpleConsumer
+import kafka.utils.Utils
+
+import org.apache.gearpump.streaming.serializer.ChillSerializer
+
+class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0,
+ host: String, port: Int) {
+
+ private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
+ private val serializer = new ChillSerializer[Int]
+ private var offset = 0L
+
+ def read(): Unit = {
+ val messageSet = consumer.fetch(
+ new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build()
+ ).messageSet(topic, partition)
+
+ for (messageAndOffset <- messageSet) {
+ serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match {
+ case Success(msg) =>
+ offset = messageAndOffset.nextOffset
+ verifier.onNext(msg)
+ case Failure(e) => throw e
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
new file mode 100644
index 0000000..73413da
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.sys.process._
+
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A helper to instantiate the base image for different usage.
+ */
+class BaseContainer(val host: String, command: String,
+ masterAddrs: List[(String, Int)],
+ tunnelPorts: Set[Int] = Set.empty) {
+
+ private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
+ private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
+ private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump"
+ private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack"
+ private val HOST_LOG_HOME = {
+ val dir = "/tmp/gearpump"
+ s"mkdir -p $dir".!!
+ s"mktemp -p $dir -d".!!.trim
+ }
+
+ private val CLUSTER_OPTS = {
+ masterAddrs.zipWithIndex.map { case (hostPort, index) =>
+ s"-Dgearpump.cluster.masters.$index=${hostPort._1}:${hostPort._2}"
+ }.mkString(" ")
+ }
+
+ def createAndStart(): String = {
+ Docker.createAndStartContainer(host, IMAGE_NAME, command,
+ environ = Map("JAVA_OPTS" -> CLUSTER_OPTS),
+ volumes = Map(
+ HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME,
+ HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME),
+ knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet,
+ tunnelPorts = tunnelPorts)
+ }
+
+ def killAndRemove(): Unit = {
+ Docker.killAndRemoveContainer(host)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
new file mode 100644
index 0000000..884a8d1
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.integrationtest.Docker
+
+/**
+ * A command-line client to operate a Gearpump cluster
+ */
+class CommandLineClient(host: String) {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ def listApps(): Array[String] = {
+ gearCommand(host, "gear info").split("\n").filter(
+ _.startsWith("application: ")
+ )
+ }
+
+ def listRunningApps(): Array[String] =
+ listApps().filter(
+ _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
+ )
+
+ def queryApp(appId: Int): String = try {
+ listApps().filter(
+ _.startsWith(s"application: $appId")
+ ).head
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ ""
+ }
+
+ def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
+ gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args")
+ }
+
+ def submitApp(jar: String, args: String = ""): Int = {
+ LOG.debug(s"|=> Submit Application $jar...")
+ submitAppUse("gear app", jar, args)
+ }
+
+ private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try {
+ gearCommand(host, s"$launcher -jar $jar $args").split("\n")
+ .filter(_.contains("The application id is ")).head.split(" ").last.toInt
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ -1
+ }
+
+ def killApp(appId: Int): Boolean = {
+ tryGearCommand(host, s"gear kill -appid $appId")
+ }
+
+ private def gearCommand(container: String, command: String): String = {
+ LOG.debug(s"|=> Gear command $command in container $container...")
+ Docker.execute(container, s"/opt/start $command")
+ }
+
+ private def tryGearCommand(container: String, command: String): Boolean = {
+ LOG.debug(s"|=> Gear command $command in container $container...")
+ Docker.executeSilently(container, s"/opt/start $command")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
new file mode 100644
index 0000000..4d439e8
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import java.io.IOException
+import scala.collection.mutable.ListBuffer
+
+import org.apache.log4j.Logger
+
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+/**
+ * This class is a test driver for end-to-end integration test.
+ */
+class MiniCluster {
+
+ private val LOG = Logger.getLogger(getClass)
+ private val SUT_HOME = "/opt/gearpump"
+
+ private val REST_SERVICE_PORT = 8090
+ private val MASTER_PORT = 3000
+ private val MASTER_ADDRS: List[(String, Int)] = {
+ (0 to 0).map(index =>
+ ("master" + index, MASTER_PORT)
+ ).toList
+ }
+
+ lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head)
+
+ lazy val restClient: RestClient = {
+ val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT)
+ client
+ }
+
+ private var workers: ListBuffer[String] = ListBuffer.empty
+
+ def start(workerNum: Int = 2): Unit = {
+
+ // Kill master
+ MASTER_ADDRS.foreach { case (host, _) =>
+ if (Docker.containerExists(host)) {
+ Docker.killAndRemoveContainer(host)
+ }
+ }
+
+ // Kill existing workers
+ workers ++= (0 until workerNum).map("worker" + _)
+ workers.foreach { worker =>
+ if (Docker.containerExists(worker)) {
+ Docker.killAndRemoveContainer(worker)
+ }
+ }
+
+ // Start Masters
+ MASTER_ADDRS.foreach({ case (host, port) =>
+ addMasterNode(host, port)
+ })
+
+ // Start Workers
+ workers.foreach { worker =>
+ val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
+ container.createAndStart()
+ }
+
+ // Check cluster status
+ expectRestClientAuthenticated()
+ expectClusterAvailable()
+ }
+
+ private def addMasterNode(host: String, port: Int): Unit = {
+ val container = new BaseContainer(host, s"master -ip $host -port $port", MASTER_ADDRS)
+ container.createAndStart()
+ }
+
+ def addWorkerNode(host: String): Unit = {
+ if (workers.find(_ == host).isEmpty) {
+ val container = new BaseContainer(host, "worker", MASTER_ADDRS)
+ container.createAndStart()
+ workers += host
+ } else {
+ throw new IOException(s"Cannot add new worker $host, " +
+ s"as worker with same hostname already exists")
+ }
+ }
+
+ /**
+ * @throws RuntimeException if rest client is not authenticated after N attempts
+ */
+ private def expectRestClientAuthenticated(): Unit = {
+ Util.retryUntil(() => {
+ restClient.login()
+ LOG.info("rest client has been authenticated")
+ true
+ }, "login successfully")
+ }
+
+ /**
+ * @throws RuntimeException if service is not available after N attempts
+ */
+ private def expectClusterAvailable(): Unit = {
+ Util.retryUntil(() => {
+ val response = restClient.queryMaster()
+ LOG.info(s"cluster is now available with response: $response.")
+ response.aliveFor > 0
+ }, "cluster running")
+ }
+
+ def isAlive: Boolean = {
+ getMasterHosts.exists(nodeIsOnline)
+ }
+
+ def getNetworkGateway: String = {
+ Docker.getNetworkGateway(MASTER_ADDRS.head._1)
+ }
+
+ def shutDown(): Unit = {
+ val removalHosts = (getMasterHosts ++ getWorkerHosts).toSet
+ .filter(nodeIsOnline).toArray
+ if (removalHosts.length > 0) {
+ Docker.killAndRemoveContainer(removalHosts)
+ }
+ workers.clear()
+ }
+
+ def removeMasterNode(host: String): Unit = {
+ Docker.killAndRemoveContainer(host)
+ }
+
+ def removeWorkerNode(host: String): Unit = {
+ workers -= host
+ Docker.killAndRemoveContainer(host)
+ }
+
+ def restart(): Unit = {
+ shutDown()
+ Util.retryUntil(() => {
+ !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
+ }, "all docker containers killed")
+ LOG.info("all docker containers have been killed. restarting...")
+ start()
+ }
+
+ def getMastersAddresses: List[(String, Int)] = {
+ MASTER_ADDRS
+ }
+
+ def getMasterHosts: List[String] = {
+ MASTER_ADDRS.map({ case (host, port) => host })
+ }
+
+ def getWorkerHosts: List[String] = {
+ workers.toList
+ }
+
+ def nodeIsOnline(host: String): Boolean = {
+ Docker.containerIsRunning(host)
+ }
+
+ private def builtInJarsUnder(folder: String): Array[String] = {
+ Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder")
+ }
+
+ private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = {
+ builtInJarsUnder(folder).filter(_.contains(subtext))
+ }
+
+ def queryBuiltInExampleJars(subtext: String): Seq[String] = {
+ queryBuiltInJars("examples", subtext)
+ }
+
+ def queryBuiltInITJars(subtext: String): Seq[String] = {
+ queryBuiltInJars("integrationtest", subtext)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
new file mode 100644
index 0000000..1b143af
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.integrationtest.minicluster
+
+import scala.reflect.ClassTag
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.log4j.Logger
+import upickle.Js
+import upickle.default._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
+import org.apache.gearpump.cluster.master.MasterSummary
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+import org.apache.gearpump.services.AppMasterService.Status
+import org.apache.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor}
+import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
+import org.apache.gearpump.streaming.executor.Executor.ExecutorSummary
+import org.apache.gearpump.util.{Constants, Graph}
+
+/**
+ * A REST client to operate a Gearpump cluster
+ */
+class RestClient(host: String, port: Int) {
+
+ private val LOG = Logger.getLogger(getClass)
+
+ private val cookieFile: String = "cookie.txt"
+
+ implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
+ upickle.default.Reader[Graph[Int, String]] {
+ case Js.Obj(verties, edges) =>
+ val vertexList = upickle.default.readJs[List[Int]](verties._2)
+ val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
+ Graph(vertexList, edgeList)
+ }
+
+ private def decodeAs[T](
+ expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
+ read[T](expr)
+ } catch {
+ case ex: Throwable =>
+ LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}")
+ throw ex
+ }
+
+ def queryVersion(): String = {
+ curl("version")
+ }
+
+ def listWorkers(): Array[WorkerSummary] = {
+ val resp = callApi("master/workerlist")
+ decodeAs[List[WorkerSummary]](resp).toArray
+ }
+
+ def listRunningWorkers(): Array[WorkerSummary] = {
+ listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
+ }
+
+ def listApps(): Array[AppMasterData] = {
+ val resp = callApi("master/applist")
+ decodeAs[AppMastersData](resp).appMasters.toArray
+ }
+
+ def listRunningApps(): Array[AppMasterData] = {
+ listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
+ }
+
+ def getNextAvailableAppId(): Int = {
+ listApps().length + 1
+ }
+
+ def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "")
+ : Boolean = try {
+ var endpoint = "master/submitapp"
+
+ var options = Seq(s"jar=@$jar")
+ if (config.length > 0) {
+ options :+= s"conf=@$config"
+ }
+
+ options :+= s"executorcount=$executorNum"
+
+ if (args != null && !args.isEmpty) {
+ options :+= "args=\"" + args + "\""
+ }
+
+ val resp = callApi(endpoint, options.map("-F " + _).mkString(" "))
+ val result = decodeAs[AppSubmissionResult](resp)
+ assert(result.success)
+ true
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def queryApp(appId: Int): AppMasterData = {
+ val resp = callApi(s"appmaster/$appId")
+ decodeAs[AppMasterData](resp)
+ }
+
+ def queryAppMasterConfig(appId: Int): Config = {
+ val resp = callApi(s"appmaster/$appId/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryStreamingAppDetail(appId: Int): StreamAppMasterSummary = {
+ val resp = callApi(s"appmaster/$appId?detail=true")
+ decodeAs[StreamAppMasterSummary](resp)
+ }
+
+ def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*")
+ : HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryExecutorSummary(appId: Int, executorId: Int): ExecutorSummary = {
+ val resp = callApi(s"appmaster/$appId/executor/$executorId")
+ decodeAs[ExecutorSummary](resp)
+ }
+
+ def queryExecutorBrief(appId: Int): Array[ExecutorBrief] = {
+ queryStreamingAppDetail(appId).executors.toArray
+ }
+
+ def queryExecutorMetrics(appId: Int, current: Boolean): HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val resp = callApi(s"appmaster/$appId/metrics/app$appId.executor*$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryExecutorConfig(appId: Int, executorId: Int): Config = {
+ val resp = callApi(s"appmaster/$appId/executor/$executorId/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryMaster(): MasterSummary = {
+ val resp = callApi("master")
+ decodeAs[MasterData](resp).masterDescription
+ }
+
+ def queryMasterMetrics(current: Boolean): HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val resp = callApi(s"master/metrics/master?$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryMasterConfig(): Config = {
+ val resp = callApi("master/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = {
+ val args = if (current) "?readLatest=true" else ""
+ val workerIdStr = WorkerId.render(workerId)
+ val resp = callApi(s"worker/$workerIdStr/metrics/worker$workerIdStr?$args")
+ decodeAs[HistoryMetrics](resp)
+ }
+
+ def queryWorkerConfig(workerId: WorkerId): Config = {
+ val resp = callApi(s"worker/${WorkerId.render(workerId)}/config")
+ ConfigFactory.parseString(resp)
+ }
+
+ def queryBuiltInPartitioners(): Array[String] = {
+ val resp = callApi("master/partitioners")
+ decodeAs[BuiltinPartitioners](resp).partitioners
+ }
+
+ def uploadJar(localFilePath: String): AppJar = {
+ val resp = callApi(s"master/uploadjar -F jar=@$localFilePath", CRUD_POST)
+ decodeAs[AppJar](resp)
+ }
+
+ def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = {
+ replaceStreamingAppProcessor(appId, replaceMe, false)
+ }
+
+ def replaceStreamingAppProcessor(
+ appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try {
+ val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf)
+ val args = upickle.default.write(replaceOperation)
+ val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args),
+ CRUD_POST)
+ decodeAs[DAGOperationResult](resp)
+ true
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def killAppMaster(appId: Int): Boolean = {
+ killExecutor(appId, Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+ }
+
+ def killExecutor(appId: Int, executorId: Int): Boolean = try {
+ val jvmInfo = queryExecutorSummary(appId, executorId).jvmName.split("@")
+ val pid = jvmInfo(0).toInt
+ val hostname = jvmInfo(1)
+ Docker.killProcess(hostname, pid)
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def killApp(appId: Int): Boolean = try {
+ val resp = callApi(s"appmaster/$appId", CRUD_DELETE)
+ resp.contains("\"status\":\"success\"")
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ def restartApp(appId: Int): Boolean = try {
+ val resp = callApi(s"appmaster/$appId/restart", CRUD_POST)
+ decodeAs[Status](resp).success
+ } catch {
+ case ex: Throwable =>
+ LOG.warn(s"swallowed an exception: $ex")
+ false
+ }
+
+ private val CRUD_POST = "-X POST"
+ private val CRUD_DELETE = "-X DELETE"
+
+ private def callApi(endpoint: String, option: String = ""): String = {
+ curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile"))
+ }
+
+ private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = {
+ Docker.curl(host, s"http://$host:$port/$endpoint", options)
+ }
+
+ def login(): Unit = {
+ curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile",
+ "--data username=admin", "--data password=admin"))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
new file mode 100644
index 0000000..79adfc4
--- /dev/null
+++ b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/storm/StormClient.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.integrationtest.storm
+
+import scala.util.Random
+
+import backtype.storm.utils.{DRPCClient, Utils}
+
+import org.apache.gearpump.integrationtest.minicluster.{BaseContainer, MiniCluster, RestClient}
+import org.apache.gearpump.integrationtest.{Docker, Util}
+
+class StormClient(cluster: MiniCluster, restClient: RestClient) {
+
+ private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses
+ private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml"
+ private val DRPC_HOST = "storm0"
+ private val DRPC_PORT = 3772
+ private val DRPC_INVOCATIONS_PORT = 3773
+ private val STORM_DRPC = "storm-drpc"
+ private val NIMBUS_HOST = "storm1"
+ private val STORM_NIMBUS = "storm nimbus"
+ private val STORM_APP = "/opt/start storm app"
+
+ private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs,
+ tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+
+ private val nimbusContainer =
+ new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs)
+
+ def start(): Unit = {
+ nimbusContainer.createAndStart()
+ ensureNimbusRunning()
+
+ drpcContainer.createAndStart()
+ ensureDrpcServerRunning()
+ }
+
+ private def ensureNimbusRunning(): Unit = {
+ Util.retryUntil(() => {
+ val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE)
+ // Parse format nimbus.thrift.port: '39322'
+ val thriftPort = response.split(" ")(1).replace("'", "").toInt
+
+ Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """)
+ }, "Nimbus running")
+ }
+
+ private def ensureDrpcServerRunning(): Unit = {
+ Util.retryUntil(() => {
+ Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """)
+ }, "DRPC running")
+ }
+
+ def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = {
+ Util.retryUntil(() => {
+ Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " +
+ s"-jar $jar $mainClass $args")
+ restClient.listRunningApps().exists(_.appName == appName)
+ }, "app running")
+ restClient.listRunningApps().filter(_.appName == appName).head.appId
+ }
+
+ def getDRPCClient(drpcServerIp: String): DRPCClient = {
+ val config = Utils.readDefaultConfig()
+ new DRPCClient(config, drpcServerIp, DRPC_PORT)
+ }
+
+ def shutDown(): Unit = {
+
+ // Cleans up the storm.yaml config file
+ Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ")
+ drpcContainer.killAndRemove()
+ nimbusContainer.killAndRemove()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
----------------------------------------------------------------------
diff --git a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala b/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
deleted file mode 100644
index 67a2491..0000000
--- a/integrationtest/storm010/src/main/scala/io/gearpump/integrationtest/storm/Adaptor.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.integrationtest.storm
-
-import backtype.storm.topology.base.BaseBasicBolt
-import backtype.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer}
-import backtype.storm.tuple.{Fields, Tuple, Values}
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper
-
-class Adaptor extends BaseBasicBolt {
- private var id = 0L
-
- override def execute(tuple: Tuple, collector: BasicOutputCollector): Unit = {
- val bytes = tuple.getBinary(0)
- collector.emit(new Values(s"$id".getBytes, bytes))
- id += 1
- }
-
- override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
- declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY,
- FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE))
- }
-}