You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/07/14 00:09:59 UTC
[incubator-heron] branch master updated: add integration test for
instance state in stateful processing (#2956)
This is an automated email from the ASF dual-hosted git repository.
huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 279fdec add integration test for instance state in stateful processing (#2956)
279fdec is described below
commit 279fdec164f04f4edcdf418de4bbb05f3890ef22
Author: Yao Li <cl...@gmail.com>
AuthorDate: Fri Jul 13 17:09:56 2018 -0700
add integration test for instance state in stateful processing (#2956)
* add integration test for instance state in stateful processing
* merge http servers
* remove the I prefix
---
integration_test/src/java/BUILD | 26 ++++-
.../common/AbstractTestTopology.java | 43 ++++++-
.../common/bolt/StatefulIdentityBolt.java | 79 +++++++++++++
.../common/spout/StatefulABSpout.java | 73 ++++++++++++
.../integration_topology_test/core/HttpUtils.java | 66 +++++++++++
.../core/StatefulBolt.java | 63 +++++++++++
.../core/StatefulIntegrationTopologyTestBolt.java | 108 ++++++++++++++++++
.../core/StatefulIntegrationTopologyTestSpout.java | 125 +++++++++++++++++++++
.../core/StatefulSpout.java | 61 ++++++++++
.../core/TopologyTestTopologyBuilder.java | 93 +++++++++++++++
.../StatefulBasicTopologyOneTask.java | 48 ++++++++
.../StatefulBasicTopologyOneTaskState.json | 4 +
.../StatefulBasicTopologyOneTaskTopo.json | 65 +++++++++++
integration_test/src/python/http_server/main.py | 29 +++++
.../src/python/topology_test_runner/main.py | 124 +++++++++++++++-----
.../topology_test_runner/resources/test.json | 21 ++--
scripts/run_integration_topology_test.sh | 7 +-
17 files changed, 991 insertions(+), 44 deletions(-)
diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index b2a7c8f..afbdea3 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -120,6 +120,23 @@ genrule(
cmd = "cp $< $@",
)
+java_library(
+ name = "core-topology",
+ srcs = glob(
+ ["org/apache/heron/integration_topology_test/core/**/*.java"],
+ ),
+ deps = [
+ "//heron/api/src/java:api-java",
+ "//heron/api/src/java:api-java-low-level",
+ "//storm-compatibility/src/java:storm-compatibility-java",
+ "//heron/proto:proto_topology_java",
+ "//third_party/java:jackson",
+ "@commons_logging_commons_logging//jar",
+ "@com_google_protobuf//:protobuf_java",
+ "@org_apache_httpcomponents_http_client//jar",
+ "@org_apache_httpcomponents_http_core//jar",
+ ],
+)
java_library(
name = "common_topology_test",
@@ -133,7 +150,8 @@ java_library(
"//third_party/java:hadoop-core",
"//third_party/java:jackson",
"@commons_cli_commons_cli//jar",
- ":core"
+ ":core",
+ ":core-topology"
],
)
@@ -150,7 +168,8 @@ java_library(
"@commons_cli_commons_cli//jar",
":common_topology_test",
":common",
- ":core"
+ ":core",
+ ":core-topology"
],
)
@@ -167,7 +186,8 @@ java_binary(
"@com_googlecode_json_simple_json_simple//jar",
":common_topology_test",
":common",
- ":core"
+ ":core",
+ ":core-topology"
],
)
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
index 97ce86e..5a9364e 100644
--- a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
@@ -33,17 +33,20 @@ import org.apache.heron.api.HeronSubmitter;
import org.apache.heron.api.exception.AlreadyAliveException;
import org.apache.heron.api.exception.InvalidTopologyException;
import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.integration_topology_test.core.TopologyTestTopologyBuilder;
/**
* Class to abstract out the common parts of the test framework for submitting topologies.
* Subclasses can implement {@code buildTopology} and call {@code submit}.
*/
public abstract class AbstractTestTopology {
+ private static final Integer CHECKPOINT_INTERVAL = 10;
private static final String TOPOLOGY_OPTION = "topology_name";
- private static final String STATE_UPDATE_TOKEN = "state_server_update_token";
+ private static final String RESULTS_URL_OPTION = "results_url";
private final CommandLine cmd;
private final String topologyName;
+ private final String httpServerResultsUrl;
protected AbstractTestTopology(String[] args) throws MalformedURLException {
CommandLineParser parser = new DefaultParser();
@@ -59,9 +62,21 @@ public abstract class AbstractTestTopology {
}
this.topologyName = cmd.getOptionValue(TOPOLOGY_OPTION);
+ if (cmd.getOptionValue(RESULTS_URL_OPTION) != null) {
+ this.httpServerResultsUrl =
+ pathAppend(cmd.getOptionValue(RESULTS_URL_OPTION), this.topologyName);
+ } else {
+ this.httpServerResultsUrl = null;
+ }
}
- protected abstract TopologyBuilder buildTopology(TopologyBuilder builder);
+ protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+ return builder;
+ }
+ protected TopologyTestTopologyBuilder buildStatefulTopology(TopologyTestTopologyBuilder
+ builder) {
+ return builder;
+ }
protected Config buildConfig(Config config) {
return config;
@@ -74,6 +89,11 @@ public abstract class AbstractTestTopology {
topologyNameOption.setRequired(true);
options.addOption(topologyNameOption);
+ Option resultsUrlOption =
+ new Option("r", RESULTS_URL_OPTION, true, "url to post and get instance state");
+ resultsUrlOption.setRequired(false);
+ options.addOption(resultsUrlOption);
+
return options;
}
@@ -82,12 +102,25 @@ public abstract class AbstractTestTopology {
}
public final void submit(Config userConf) throws AlreadyAliveException, InvalidTopologyException {
- TopologyBuilder builder = new TopologyBuilder();
-
Config conf = buildConfig(new BasicConfig());
if (userConf != null) {
conf.putAll(userConf);
}
- HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+
+ if (this.httpServerResultsUrl == null) {
+ TopologyBuilder builder = new TopologyBuilder();
+ HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+
+ } else {
+ TopologyTestTopologyBuilder builder = new TopologyTestTopologyBuilder(httpServerResultsUrl);
+ conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE);
+ conf.setTopologyStatefulCheckpointIntervalSecs(CHECKPOINT_INTERVAL);
+ HeronSubmitter.submitTopology(topologyName, conf,
+ buildStatefulTopology(builder).createTopology());
+ }
+ }
+
+ private static String pathAppend(String url, String path) {
+ return String.format("%s/%s", url, path);
}
}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java
new file mode 100644
index 0000000..821959f
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common.bolt;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.integration_topology_test.core.StatefulBolt;
+
+public class StatefulIdentityBolt extends StatefulBolt {
+ private static final long serialVersionUID = 1174496806359441609L;
+ private static final Logger LOG = Logger.getLogger(StatefulIdentityBolt.class.getName());
+ private Fields fields;
+
+ public StatefulIdentityBolt() { }
+
+ public StatefulIdentityBolt(Fields fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> map,
+ TopologyContext context,
+ OutputCollector outputCollector) {
+ super.prepare(map, context, outputCollector);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ StringBuilder sb = new StringBuilder();
+ for (Object o : input.getValues()) {
+ if (sb.length() > 0) {
+ sb.append(",");
+ }
+ sb.append(o.toString());
+ }
+ String word = sb.toString();
+ LOG.info("Receiving and emitting tuple values: " + word);
+ if (!state.containsKey(word)) {
+ state.put(word, 1);
+ } else {
+ state.put(word, state.get(word) + 1);
+ }
+ collector.emit(input.getValues());
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(fields);
+ }
+
+ @Override
+ public void initState(State<String, Integer> state) {
+ super.initState(state);
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java
new file mode 100644
index 0000000..b40671f
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common.spout;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.integration_topology_test.core.StatefulSpout;
+
+public class StatefulABSpout extends StatefulSpout {
+
+ private static final long serialVersionUID = 7431612805823106868L;
+ private static final String[] TO_SEND = new String[]{"A", "B"};
+
+ private int emitted = 0;
+ private boolean appendSequenceId;
+
+ public StatefulABSpout() {
+ this(false);
+ }
+
+ public StatefulABSpout(boolean appendSequenceId) {
+ this.appendSequenceId = appendSequenceId;
+ }
+
+ @Override
+ public void open(Map<String, Object> conf,
+ TopologyContext newContext,
+ SpoutOutputCollector newCollector) {
+ super.open(conf, newContext, newCollector);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+ @Override
+ public void nextTuple() {
+ String word = TO_SEND[emitted % TO_SEND.length];
+ if (appendSequenceId) {
+ word = word + "_" + emitted;
+ }
+ collector.emit(new Values(word));
+ if (!state.containsKey(word)) {
+ state.put(word, 1);
+ } else {
+ state.put(word, state.get(word) + 1);
+ }
+ emitted++;
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java
new file mode 100644
index 0000000..2c19e11
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.io.IOException;
+import java.text.ParseException;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+public final class HttpUtils {
+
+ private HttpUtils() { }
+
+ public static int httpJsonPost(String newHttpPostUrl, String jsonData)
+ throws IOException, ParseException {
+ HttpClient client = HttpClientBuilder.create().build();
+ HttpPost post = new HttpPost(newHttpPostUrl);
+
+ StringEntity requestEntity = new StringEntity(jsonData, ContentType.APPLICATION_JSON);
+
+ post.setEntity(requestEntity);
+ HttpResponse response = client.execute(post);
+
+ return response.getStatusLine().getStatusCode();
+ }
+
+ public static void postToHttpServer(String postUrl, String data, String dataName)
+ throws RuntimeException {
+ try {
+ int responseCode = -1;
+ for (int attempts = 0; attempts < 2; attempts++) {
+ responseCode = httpJsonPost(postUrl, data);
+ if (responseCode == 200) {
+ return;
+ }
+ }
+ throw new RuntimeException(
+ String.format("Failed to post %s to %s: %s",
+ dataName, postUrl, responseCode));
+ } catch (IOException | java.text.ParseException e) {
+ throw new RuntimeException(String.format("Posting %s to %s failed",
+ dataName, postUrl), e);
+ }
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java
new file mode 100644
index 0000000..62bf8b9
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+
+public class StatefulBolt extends BaseRichBolt implements IStatefulComponent<String, Integer> {
+
+ private static final long serialVersionUID = 5834931054885658328L;
+
+ protected State<String, Integer> state;
+ protected OutputCollector collector;
+ protected TopologyContext context;
+
+ public StatefulBolt() { }
+
+ @Override
+ public void prepare(Map<String, Object> map,
+ TopologyContext inputContext,
+ OutputCollector inputOutputCollector) {
+ this.context = inputContext;
+ this.collector = inputOutputCollector;
+ }
+
+ @Override
+ public void execute(Tuple input) { }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+
+ @Override
+ public void initState(State<String, Integer> inputState) {
+ this.state = inputState;
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java
new file mode 100644
index 0000000..11084c7
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+
+public class StatefulIntegrationTopologyTestBolt extends StatefulBolt {
+
+ private static final long serialVersionUID = 6657584414733837761L;
+ private static final Logger LOG =
+ Logger.getLogger(StatefulIntegrationTopologyTestBolt.class.getName());
+ private final StatefulBolt delegateBolt;
+ private boolean checkpointExisted = false;
+ private String outputLocation;
+
+ public StatefulIntegrationTopologyTestBolt(StatefulBolt delegate, String outputLocation) {
+ this.delegateBolt = delegate;
+ this.outputLocation = outputLocation;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> map,
+ TopologyContext context,
+ OutputCollector outputCollector) {
+
+ this.delegateBolt.prepare(map, context, outputCollector);
+
+ // send instance state to http server
+ if (!delegateBolt.state.isEmpty()) {
+ String compId = String.format("%s_%d", delegateBolt.context.getThisComponentId(),
+ delegateBolt.context.getThisTaskId());
+ String dataName = String.format("instance %s state", compId);
+ String stateJsonString = formatJson(compId, delegateBolt.state);
+
+ LOG.info(String.format("Posting %s to %s", dataName, this.outputLocation));
+ HttpUtils.postToHttpServer(this.outputLocation, stateJsonString, dataName);
+ }
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ String streamID = tuple.getSourceStreamId();
+ LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);
+ delegateBolt.execute(tuple);
+ }
+
+ @Override
+ public void cleanup() {
+ delegateBolt.cleanup();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ delegateBolt.declareOutputFields(outputFieldsDeclarer);
+ }
+
+ @Override
+ public void initState(State<String, Integer> state) {
+ delegateBolt.initState(state);
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ if (!checkpointExisted) {
+ delegateBolt.preSave(checkpointId);
+ checkpointExisted = true;
+ } else {
+ if (delegateBolt.context.getThisTaskIndex() == 0) {
+ throw new RuntimeException("Kill instance " + context.getThisComponentId());
+ }
+ }
+ }
+
+ private String formatJson(String compId, Map<String, Integer> state) {
+ StringBuilder stateString = new StringBuilder();
+ for (Map.Entry<String, Integer> entry : state.entrySet()) {
+ if (stateString.length() != 0) {
+ stateString.append(", ");
+ }
+ stateString.append(String.format("\"%s\": %d", entry.getKey(), entry.getValue()));
+ }
+ return String.format("{\"%s\": {%s}}", compId, stateString.toString());
+ }
+
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java
new file mode 100644
index 0000000..7f039fe
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+
+public class StatefulIntegrationTopologyTestSpout extends StatefulSpout {
+
+ private static final long serialVersionUID = -6920782627142720131L;
+ private static final Logger LOG = Logger
+ .getLogger(StatefulIntegrationTopologyTestSpout.class.getName());
+
+ private final StatefulSpout delegateSpout;
+ private int maxExecutions;
+ private int curExecutions;
+ private String outputLocation;
+
+ public StatefulIntegrationTopologyTestSpout(StatefulSpout delegateSpout,
+ int maxExecutions, String outputLocation) {
+ this.delegateSpout = delegateSpout;
+ this.maxExecutions = maxExecutions;
+ this.curExecutions = maxExecutions;
+ this.outputLocation = outputLocation;
+ }
+
+ private void resetMaxExecutions(int resetExecutions) {
+ this.curExecutions = resetExecutions;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ delegateSpout.declareOutputFields(outputFieldsDeclarer);
+ }
+
+ @Override
+ public void close() {
+ delegateSpout.close();
+ }
+
+ @Override
+ public void activate() {
+ delegateSpout.activate();
+ }
+
+ @Override
+ public void deactivate() {
+ delegateSpout.deactivate();
+ }
+
+ @Override
+ public void open(Map<String, Object> map,
+ TopologyContext topologyContext,
+ SpoutOutputCollector outputCollector) {
+
+ delegateSpout.open(map, topologyContext, outputCollector);
+
+ // send instance state to http server
+ if (!delegateSpout.state.isEmpty()) {
+ String compId = String.format("%s_%d", delegateSpout.context.getThisComponentId(),
+ delegateSpout.context.getThisTaskId());
+ String dataName = String.format("instance %s state", compId);
+ String stateJsonString = formatJson(compId, delegateSpout.state);
+
+ LOG.info(String.format("Posting %s to %s", dataName, this.outputLocation));
+ HttpUtils.postToHttpServer(this.outputLocation, stateJsonString, dataName);
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ if (doneEmitting()) {
+ return;
+ }
+ curExecutions--;
+ delegateSpout.nextTuple();
+ }
+
+ protected boolean doneEmitting() {
+ return curExecutions <= 0;
+ }
+
+ @Override
+ public void initState(State<String, Integer> state) {
+ delegateSpout.initState(state);
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ delegateSpout.preSave(checkpointId);
+ resetMaxExecutions(maxExecutions);
+ }
+
+ private String formatJson(String compId, Map<String, Integer> state) {
+ StringBuilder stateString = new StringBuilder();
+ for (Map.Entry<String, Integer> entry : state.entrySet()) {
+ if (stateString.length() != 0) {
+ stateString.append(", ");
+ }
+ stateString.append(String.format("\"%s\": %d", entry.getKey(), entry.getValue()));
+ }
+ return String.format("{\"%s\": {%s}}", compId, stateString.toString());
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java
new file mode 100644
index 0000000..8783d41
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+
+public class StatefulSpout extends BaseRichSpout implements IStatefulComponent<String, Integer> {
+
+ private static final long serialVersionUID = 2045875254384424423L;
+ protected SpoutOutputCollector collector;
+ protected State<String, Integer> state;
+ protected TopologyContext context;
+
+ public StatefulSpout() { }
+
+ @Override
+ public void open(Map<String, Object> conf,
+ TopologyContext newContext,
+ SpoutOutputCollector newCollector) {
+ this.context = newContext;
+ this.collector = newCollector;
+ }
+
+ @Override
+ public void nextTuple() { }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+
+ @Override
+ public void initState(State<String, Integer> inputState) {
+ this.state = inputState;
+ }
+
+ @Override
+ public void preSave(String checkpointId) {
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java
new file mode 100644
index 0000000..5eee2a6
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.HeronTopology;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.topology.BoltDeclarer;
+import org.apache.heron.api.topology.SpoutDeclarer;
+import org.apache.heron.api.topology.TopologyBuilder;
+
+public class TopologyTestTopologyBuilder extends TopologyBuilder {
+
+ private static final int DEFAULT_EXECUTION_COUNT = 10;
+ private final Map<String, Integer> components = new HashMap<>();
+ private final String outputLocation;
+
+ public TopologyTestTopologyBuilder(String outputLocation) {
+ this.outputLocation = outputLocation;
+ }
+
+ public BoltDeclarer setBolt(String id, StatefulBolt bolt, Number parallelismHint) {
+ return setBolt(id, new StatefulIntegrationTopologyTestBolt(bolt, this.outputLocation),
+ parallelismHint);
+ }
+
+ public BoltDeclarer setBolt(String id, StatefulIntegrationTopologyTestBolt bolt,
+ Number parallelismHint) {
+ components.put(id, (Integer) parallelismHint);
+ return super.setBolt(id, bolt, parallelismHint);
+ }
+
+ public SpoutDeclarer setSpout(String id, StatefulSpout spout, Number parallelismHint) {
+ return setSpout(id, spout, parallelismHint, DEFAULT_EXECUTION_COUNT);
+ }
+
+ // A method allows user to define the maxExecutionCount of the spout
+ // To be compatible with earlier Integration Test Framework
+ public SpoutDeclarer setSpout(String id, StatefulSpout spout,
+ Number parallelismHint, int maxExecutionCount) {
+
+ StatefulIntegrationTopologyTestSpout wrappedSpout =
+ new StatefulIntegrationTopologyTestSpout(spout, maxExecutionCount, this.outputLocation);
+
+ return setSpout(id, wrappedSpout, parallelismHint);
+ }
+
+ private SpoutDeclarer setSpout(String id, StatefulIntegrationTopologyTestSpout itSpout,
+ Number parallelismHint) {
+ components.put(id, (Integer) parallelismHint);
+ return super.setSpout(id, itSpout, parallelismHint);
+ }
+
+ @Override
+ public HeronTopology createTopology() {
+
+ TopologyAPI.Topology.Builder topologyBlr =
+ super.createTopology().
+ setConfig(new Config()).
+ setName("").
+ setState(TopologyAPI.TopologyState.RUNNING).
+ getTopology().toBuilder();
+
+ // Clear unnecessary fields to make the state of TopologyAPI.Topology.Builder clean
+ topologyBlr.clearTopologyConfig().clearName().clearState();
+
+ // We wrap it to the new topologyBuilder
+ return new HeronTopology(topologyBlr);
+ }
+
+ public Map<String, Integer> getComponentParallelism() {
+ return components;
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java
new file mode 100644
index 0000000..0e1d901
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.topology.stateful_basic_topology_one_task;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+import org.apache.heron.integration_topology_test.common.bolt.StatefulIdentityBolt;
+import org.apache.heron.integration_topology_test.common.spout.StatefulABSpout;
+import org.apache.heron.integration_topology_test.core.TopologyTestTopologyBuilder;
+
+public final class StatefulBasicTopologyOneTask extends AbstractTestTopology {
+
+ private StatefulBasicTopologyOneTask(String[] args) throws MalformedURLException {
+ super(args);
+ }
+
+ @Override
+ protected TopologyTestTopologyBuilder buildStatefulTopology(TopologyTestTopologyBuilder builder) {
+ builder.setSpout("stateful-ab-spout", new StatefulABSpout(true), 1);
+ builder.setBolt("stateful-identity-bolt",
+ new StatefulIdentityBolt(new Fields("word")), 3)
+ .shuffleGrouping("stateful-ab-spout");
+ return builder;
+ }
+
+ public static void main(String[] args) throws Exception {
+ StatefulBasicTopologyOneTask topology = new StatefulBasicTopologyOneTask(args);
+ topology.submit();
+ }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json
new file mode 100644
index 0000000..1d111df
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json
@@ -0,0 +1,4 @@
+[{"stateful-ab-spout": {"A_6": 1, "A_4": 1, "A_2": 1, "A_0": 1, "A_8": 1, "B_9": 1, "B_1": 1, "B_3": 1, "B_5": 1, "B_7": 1}},
+ {"stateful-identity-bolt": {"B_5": 1, "A_2": 1, "A_8": 1}},
+ {"stateful-identity-bolt": {"B_1": 1, "A_4": 1, "B_7": 1}},
+ {"stateful-identity-bolt": {"A_6": 1, "B_3": 1, "A_0": 1, "B_9": 1}}]
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json
new file mode 100644
index 0000000..691ea87
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json
@@ -0,0 +1,65 @@
+{
+ "topology": {
+ "id": "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+ "name": "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+ "spouts": {
+ "comp": {
+ "name": "stateful-ab-spout",
+ "config": {
+ "kvs": {
+ "key": "topology.component.parallelism",
+ "value": "1"
+ }
+ }
+ }
+ },
+ "bolts": [
+ {
+ "comp": {
+ "name": "stateful-identity-bolt",
+ "config": {
+ "kvs": {
+ "key": "topology.component.parallelism",
+ "value": "3"
+ }
+ }
+ },
+ "inputs": [
+ {
+ "stream": {
+ "id": "default",
+ "component_name": "stateful-ab-spout"
+ }
+ }
+ ]
+ }
+ ],
+ "state": "RUNNING"
+ },
+ "instances": [
+ {
+ "instance_id": "container_1_stateful-ab-spout_1",
+ "info": {
+ "component_name": "stateful-ab-spout"
+ }
+ },
+ {
+ "instance_id": "container_1_stateful-identity-bolt_2",
+ "info": {
+ "component_name": "stateful-identity-bolt"
+ }
+ },
+ {
+ "instance_id": "container_2_stateful-identity-bolt_3",
+ "info": {
+ "component_name": "stateful-identity-bolt"
+ }
+ },
+ {
+ "instance_id": "container_2_stateful-identity-bolt_4",
+ "info": {
+ "component_name": "stateful-identity-bolt"
+ }
+ }
+ ]
+}
diff --git a/integration_test/src/python/http_server/main.py b/integration_test/src/python/http_server/main.py
index 262de3d..72dc003 100644
--- a/integration_test/src/python/http_server/main.py
+++ b/integration_test/src/python/http_server/main.py
@@ -71,6 +71,32 @@ class MemoryMapHandler(tornado.web.RequestHandler):
self.state_map[key] = tornado.escape.json_encode(data)
self.write("Results written to " + tornado.escape.json_encode(self.state_map) + " successfully")
+# for instance states in stateful processing
+class StateResultHandler(tornado.web.RequestHandler):
+ def initialize(self, result_map):
+ self.result_map = result_map
+
+ def get(self, key):
+ if key:
+ self.set_header("Content-Type", 'application/json; charset="utf-8"')
+ if key in self.result_map:
+ self.write(tornado.escape.json_encode(self.result_map[key]))
+ else:
+ raise tornado.web.HTTPError(status_code=404, log_message="Key %s not found" % key)
+ else:
+ self.write(tornado.escape.json_encode(self.result_map))
+
+ def post(self, key):
+ data = tornado.escape.json_decode(self.request.body)
+ if key:
+ if key in self.result_map:
+ self.result_map[key].append(data)
+ else:
+ self.result_map[key] = [data]
+ self.write("Results written successfully: topology " + key + ' instance ' + data.keys()[0])
+ else:
+ raise tornado.web.HTTPError(status_code=404, log_message="Invalid key %s" % key)
+
def main():
'''
Runs a tornado http server that listens for any
@@ -83,11 +109,14 @@ def main():
os.makedirs(RESULTS_DIRECTORY)
state_map = {}
+ # for instance states in stateful processing
+ state_result_map = {}
application = tornado.web.Application([
(r"/", MainHandler),
(r"^/results/([a-zA-Z0-9_-]+$)", FileHandler),
(r"^/state", MemoryMapGetAllHandler, dict(state_map=state_map)),
(r"^/state/([a-zA-Z0-9_-]+$)", MemoryMapHandler, dict(state_map=state_map)),
+ (r"^/stateResults/([a-zA-Z0-9_-]+$)", StateResultHandler, dict(result_map=state_result_map)),
])
if len(sys.argv) == 1:
diff --git a/integration_test/src/python/topology_test_runner/main.py b/integration_test/src/python/topology_test_runner/main.py
index ec948cc..1ed4cf8 100644
--- a/integration_test/src/python/topology_test_runner/main.py
+++ b/integration_test/src/python/topology_test_runner/main.py
@@ -8,6 +8,7 @@ import re
import sys
import time
import uuid
+from httplib import HTTPConnection
from ..common import status
from heron.common.src.python.utils import log
@@ -160,16 +161,48 @@ class InstanceStateResultChecker(TopologyStructureResultChecker):
raise status.TestFailure("The actual topology graph structure does not match the expected one"
+ " for topology: %s" % self.topology_name)
# check instance states, get the instance_state_check_result
- # if both above are isinstanc(status.TestSuccess), return success, else return fail
+ # if both above are isinstance(status.TestSuccess), return success, else return fail
+ expected_result = self.instance_state_expected_result_handler.fetch_results()
- def _compare_state(self, expected_results, actual_results):
- pass
+ decoder = json.JSONDecoder(strict=False)
+ expected_result = decoder.decode(expected_result)
+
+ actual_result =[]
+ for _ in range(0, RETRY_ATTEMPTS):
+ actual_result = self.instance_state_actual_result_handler.fetch_results()
+ actual_result = decoder.decode(actual_result)
+ if len(actual_result) == len(expected_result):
+ break
+ else:
+ time.sleep(RETRY_INTERVAL)
+ else:
+ raise status.TestFailure("Fail to get actual results of instance states for topology %s"
+ % self.topology_name)
- def _parse_state_expected_results(self, expected_results):
- pass
+ if '_' not in expected_result[0].keys()[0]:
+ actual_result = self._parse_instance_id(actual_result)
- def _parse_state_actual_results(self, actual_results):
- pass
+ return self._compare_state(sorted(expected_result), sorted(actual_result))
+
+ def _compare_state(self, expected_results, actual_results):
+ if actual_results == expected_results:
+ return status.TestSuccess("Topology %s instance state result matches expected result"
+ % self.topology_name)
+ else:
+ failure = status.TestFailure("Actual result did not match expected result")
+ # lambda required below to remove the unicode 'u' from the output
+ logging.info("Actual result ---------- \n" + str(map(lambda x: str(x), actual_results)))
+ logging.info("Expected result ---------- \n" + str(map(lambda x: str(x), expected_results)))
+ raise failure
+
+ def _parse_instance_id(self, input):
+ # remove taskId in instaneId
+ output = list()
+ for ele in input:
+ for key in ele:
+ new_key = key.split('_')[0]
+ output.append({new_key: dict(ele[key])})
+ return output
class FileBasedExpectedResultsHandler(object):
@@ -247,14 +280,44 @@ class HttpBasedActualResultsHandler(object):
Get actually loaded instance states
TODO(yaoli): complete this class when stateful processing is ready
"""
- pass
+ def __init__(self, server_host_port, topology_name):
+ self.server_host_port = server_host_port
+ self.topology_name = topology_name
-class StatefulStorageBasedExpectedResultsHandler(object):
- """
- Get expected instance states from checkpoint storage
- TODO(yaoli): complete this class when stateful processing is ready
- """
- pass
+ def fetch_results(self):
+ try:
+ return self.fetch_from_server(self.server_host_port, self.topology_name,
+ 'instance_state', '/stateResults/%s' % self.topology_name)
+ except Exception as e:
+ raise status.TestFailure("Fetching instance state failed for %s topology" % self.topology_name, e)
+
+ def fetch_from_server(self, server_host_port, topology_name, data_name, path):
+ ''' Make a http get request to fetch actual results from http server '''
+ for i in range(0, RETRY_ATTEMPTS):
+ logging.info("Fetching %s for topology %s, retry count: %d", data_name, topology_name, i)
+ response = self.get_http_response(server_host_port, path)
+ if response.status == 200:
+ return response.read()
+ elif i != RETRY_ATTEMPTS:
+ logging.info("Fetching %s failed with status: %s; reason: %s; body: %s",
+ data_name, response.status, response.reason, response.read())
+ time.sleep(RETRY_INTERVAL)
+
+ raise status.TestFailure("Failed to fetch %s after %d attempts" % (data_name, RETRY_ATTEMPTS))
+
+ def get_http_response(self, server_host_port, path):
+ ''' get HTTP response '''
+ for _ in range(0, RETRY_ATTEMPTS):
+ try:
+ connection = HTTPConnection(server_host_port)
+ connection.request('GET', path)
+ response = connection.getresponse()
+ return response
+ except Exception:
+ time.sleep(RETRY_INTERVAL)
+ continue
+
+ raise status.TestFailure("Failed to get HTTP Response after %d attempts" % RETRY_ATTEMPTS)
# Result handlers end
@@ -273,10 +336,14 @@ def filter_test_topologies(test_topologies, test_pattern):
def run_topology_test(topology_name, classpath, results_checker,
- params, update_args, deactivate_args, restart_args, extra_topology_args):
+ params, update_args, deactivate_args, restart_args, http_server_host_port, extra_topology_args,
+ check_type):
try:
- args = "-t %s %s" % \
- (topology_name, extra_topology_args)
+ if check_type == 'checkpoint_state':
+ args = "-r http://%s/stateResults -t %s %s" % \
+ (http_server_host_port, topology_name, extra_topology_args)
+ else:
+ args = "-t %s %s" % (topology_name, extra_topology_args)
submit_topology(params.heron_cli_path, params.cli_config_path, params.cluster, params.role,
params.env, params.tests_bin_path, classpath,
params.release_package_uri, args)
@@ -413,7 +480,7 @@ def run_topology_tests(conf, args):
failures = []
timestamp = time.strftime('%Y%m%d%H%M%S')
- # http_host_port = "%s:%d" % (args.http_hostname, args.http_port)
+ http_server_host_port = "%s:%d" % (args.http_hostname, args.http_port)
if args.tests_bin_path.endswith("scala-integration-tests.jar"):
test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern)
@@ -437,7 +504,7 @@ def run_topology_tests(conf, args):
update_args = ""
deactivate_args = ""
restart_args = ""
- topology_args = ''
+ topology_args = ""
if "updateArgs" in topology_conf:
update_args = topology_conf["updateArgs"]
if "deactivateArgs" in topology_conf:
@@ -448,22 +515,26 @@ def run_topology_tests(conf, args):
if "topologyArgs" in topology_conf:
topology_args = "%s %s" % (topology_args, topology_conf["topologyArgs"])
- expected_result_file_path = \
- args.topologies_path + "/" + topology_conf["expectedResultRelativePath"]
+ expected_topo_result_file_path = \
+ args.topologies_path + "/" + topology_conf["expectedTopoResultRelativePath"]
+ if "expectedStateResultRelativePath" in topology_conf:
+ expected_state_result_file_path = \
+ args.topologies_path + "/" + topology_conf["expectedStateResultRelativePath"]
+
check_type = topology_conf["checkType"]
if check_type == 'topology_structure':
results_checker = load_result_checker(
check_type, topology_name,
- FileBasedExpectedResultsHandler(expected_result_file_path),
+ FileBasedExpectedResultsHandler(expected_topo_result_file_path),
ZkFileBasedActualResultsHandler(topology_name, args.cluster))
elif check_type == 'checkpoint_state':
if processing_type == 'stateful':
results_checker = load_result_checker(
check_type, topology_name,
- FileBasedExpectedResultsHandler(expected_result_file_path),
+ FileBasedExpectedResultsHandler(expected_topo_result_file_path),
ZkFileBasedActualResultsHandler(topology_name, args.cluster),
- StatefulStorageBasedExpectedResultsHandler(),
- HttpBasedActualResultsHandler())
+ FileBasedExpectedResultsHandler(expected_state_result_file_path),
+ HttpBasedActualResultsHandler(http_server_host_port, topology_name))
elif processing_type == 'non_stateful':
raise ValueError("Cannot check instance checkpoint state in non_stateful processing. "
+ "Not running topology: " + topology_name)
@@ -477,7 +548,8 @@ def run_topology_tests(conf, args):
start_secs = int(time.time())
try:
result = run_topology_test(topology_name, classpath, results_checker,
- args, update_args, deactivate_args, restart_args, topology_args)
+ args, update_args, deactivate_args, restart_args, http_server_host_port, topology_args,
+ check_type)
test_tuple = (topology_name, int(time.time()) - start_secs)
if isinstance(result, status.TestSuccess):
successes += [test_tuple]
diff --git a/integration_test/src/python/topology_test_runner/resources/test.json b/integration_test/src/python/topology_test_runner/resources/test.json
index 0119a48..3682aa1 100644
--- a/integration_test/src/python/topology_test_runner/resources/test.json
+++ b/integration_test/src/python/topology_test_runner/resources/test.json
@@ -4,49 +4,56 @@
"role" : "dummy_role",
"env" : "dummy_env",
"cliConfigPath" : "$HOME/.heron/conf",
- "processingType" : "non_stateful",
+ "processingType" : "stateful",
"topologyClasspathPrefix" : "org.apache.heron.integration_topology_test.topology.",
"javaTopologies": [
{
"topologyName" : "IntegrationTopologyTest_FieldsGrouping",
"classPath" : "fields_grouping.FieldsGrouping",
- "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+ "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
"checkType" : "topology_structure"
},
{
"topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUp",
"classPath" : "basic_topology_one_task_scale_up.BasicTopologyOneTask",
"updateArgs" : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:3",
- "expectedResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
+ "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
"checkType" : "topology_structure"
},
{
"topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleDown",
"classPath" : "basic_topology_one_task_scale_down.BasicTopologyOneTask",
"updateArgs" : "--component-parallelism=ab-spout:1 --component-parallelism=identity-bolt:2",
- "expectedResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
+ "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
"checkType" : "topology_structure"
},
{
"topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUpDown",
"classPath" : "basic_topology_one_task_scale_up_down.BasicTopologyOneTask",
"updateArgs" : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:2",
- "expectedResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
+ "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
"checkType" : "topology_structure"
},
{
"topologyName" : "IntegrationTopologyTest_FieldsGroupingDeactivate",
"classPath" : "fields_grouping.FieldsGrouping",
"deactivateArgs" : "True",
- "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+ "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
"checkType" : "topology_structure"
},
{
"topologyName" : "IntegrationTopologyTest_FieldsGroupingKillContainer",
"classPath" : "fields_grouping.FieldsGrouping",
"restartArgs" : "True",
- "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+ "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
"checkType" : "topology_structure"
+ },
+ {
+ "topologyName" : "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+ "classPath" : "stateful_basic_topology_one_task.StatefulBasicTopologyOneTask",
+ "expectedTopoResultRelativePath" : "stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json",
+ "expectedStateResultRelativePath" : "stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json",
+ "checkType" : "checkpoint_state"
}
]
}
diff --git a/scripts/run_integration_topology_test.sh b/scripts/run_integration_topology_test.sh
index cfa81b3..b56eda5 100644
--- a/scripts/run_integration_topology_test.sh
+++ b/scripts/run_integration_topology_test.sh
@@ -3,6 +3,7 @@
# Script to locally run the integration topology test.
#
+HTTP_SERVER="./bazel-bin/integration_test/src/python/http_server/http-server"
TEST_RUNNER="./bazel-bin/integration_test/src/python/topology_test_runner/topology-test-runner.pex"
JAVA_TESTS_DIR="integration_test/src/java/org/apache/heron/integration_topology_test/topology"
@@ -25,9 +26,9 @@ bazel run --config=`platform` -- scripts/packages:heron-install.sh --user
bazel build --config=`platform` {heron/...,scripts/packages:tarpkgs,integration_test/src/...}
# run the simple http server
-#${HTTP_SERVER} 8080 &
-#http_server_id=$!
-#trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+${HTTP_SERVER} 8080 &
+http_server_id=$!
+trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
# run the scala integration tests
#${TEST_RUNNER} \