You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/07/14 00:09:58 UTC

[GitHub] huijunwu closed pull request #2956: add integration test for instance state in stateful processing

huijunwu closed pull request #2956: add integration test for instance state in stateful processing
URL: https://github.com/apache/incubator-heron/pull/2956
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index b2a7c8f83a..afbdea3877 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -120,6 +120,23 @@ genrule(
     cmd = "cp $< $@",
 )
 
+java_library(
+    name = "core-topology",
+    srcs = glob(
+        ["org/apache/heron/integration_topology_test/core/**/*.java"],
+    ),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
+        "//storm-compatibility/src/java:storm-compatibility-java",
+        "//heron/proto:proto_topology_java",
+        "//third_party/java:jackson",
+        "@commons_logging_commons_logging//jar",
+        "@com_google_protobuf//:protobuf_java",
+        "@org_apache_httpcomponents_http_client//jar",
+        "@org_apache_httpcomponents_http_core//jar",
+    ],
+)
 
 java_library(
     name = "common_topology_test",
@@ -133,7 +150,8 @@ java_library(
         "//third_party/java:hadoop-core",
         "//third_party/java:jackson",
         "@commons_cli_commons_cli//jar",
-        ":core"
+        ":core",
+        ":core-topology"
     ],
 )
 
@@ -150,7 +168,8 @@ java_library(
         "@commons_cli_commons_cli//jar",
         ":common_topology_test",
         ":common",
-        ":core"
+        ":core",
+        ":core-topology"
     ],
 )
 
@@ -167,7 +186,8 @@ java_binary(
         "@com_googlecode_json_simple_json_simple//jar",
         ":common_topology_test",
         ":common",
-        ":core"
+        ":core",
+        ":core-topology"
     ],
 )
 
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
index 97ce86ea25..5a9364e007 100644
--- a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
@@ -33,17 +33,20 @@
 import org.apache.heron.api.exception.AlreadyAliveException;
 import org.apache.heron.api.exception.InvalidTopologyException;
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.integration_topology_test.core.TopologyTestTopologyBuilder;
 
 /**
  * Class to abstract out the common parts of the test framework for submitting topologies.
  * Subclasses can implement {@code buildTopology} and call {@code submit}.
  */
 public abstract class AbstractTestTopology {
+  private static final Integer CHECKPOINT_INTERVAL = 10;
   private static final String TOPOLOGY_OPTION = "topology_name";
-  private static final String STATE_UPDATE_TOKEN = "state_server_update_token";
+  private static final String RESULTS_URL_OPTION = "results_url";
 
   private final CommandLine cmd;
   private final String topologyName;
+  private final String httpServerResultsUrl;
 
   protected AbstractTestTopology(String[] args) throws MalformedURLException {
     CommandLineParser parser = new DefaultParser();
@@ -59,9 +62,21 @@ protected AbstractTestTopology(String[] args) throws MalformedURLException {
     }
 
     this.topologyName = cmd.getOptionValue(TOPOLOGY_OPTION);
+    if (cmd.getOptionValue(RESULTS_URL_OPTION) != null) {
+      this.httpServerResultsUrl =
+          pathAppend(cmd.getOptionValue(RESULTS_URL_OPTION), this.topologyName);
+    } else {
+      this.httpServerResultsUrl = null;
+    }
   }
 
-  protected abstract TopologyBuilder buildTopology(TopologyBuilder builder);
+  protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+    return builder;
+  }
+  protected TopologyTestTopologyBuilder buildStatefulTopology(TopologyTestTopologyBuilder
+                                                                           builder) {
+    return builder;
+  }
 
   protected Config buildConfig(Config config) {
     return config;
@@ -74,6 +89,11 @@ protected Options getArgOptions() {
     topologyNameOption.setRequired(true);
     options.addOption(topologyNameOption);
 
+    Option resultsUrlOption =
+        new Option("r", RESULTS_URL_OPTION, true, "url to post and get instance state");
+    resultsUrlOption.setRequired(false);
+    options.addOption(resultsUrlOption);
+
     return options;
   }
 
@@ -82,12 +102,25 @@ public final void submit() throws AlreadyAliveException, InvalidTopologyExceptio
   }
 
   public final void submit(Config userConf) throws AlreadyAliveException, InvalidTopologyException {
-    TopologyBuilder builder = new TopologyBuilder();
-
     Config conf = buildConfig(new BasicConfig());
     if (userConf != null) {
       conf.putAll(userConf);
     }
-    HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+
+    if (this.httpServerResultsUrl == null) {
+      TopologyBuilder builder = new TopologyBuilder();
+      HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+
+    } else {
+      TopologyTestTopologyBuilder builder = new TopologyTestTopologyBuilder(httpServerResultsUrl);
+      conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE);
+      conf.setTopologyStatefulCheckpointIntervalSecs(CHECKPOINT_INTERVAL);
+      HeronSubmitter.submitTopology(topologyName, conf,
+          buildStatefulTopology(builder).createTopology());
+    }
+  }
+
+  private static String pathAppend(String url, String path) {
+    return String.format("%s/%s", url, path);
   }
 }
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java
new file mode 100644
index 0000000000..821959fdad
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common.bolt;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.integration_topology_test.core.StatefulBolt;
+
+public class StatefulIdentityBolt extends StatefulBolt {
+  private static final long serialVersionUID = 1174496806359441609L;
+  private static final Logger LOG = Logger.getLogger(StatefulIdentityBolt.class.getName());
+  private Fields fields;
+
+  public StatefulIdentityBolt() { }
+
+  public StatefulIdentityBolt(Fields fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext context,
+                      OutputCollector outputCollector) {
+    super.prepare(map, context, outputCollector);
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    StringBuilder sb = new StringBuilder();
+    for (Object o : input.getValues()) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(o.toString());
+    }
+    String word = sb.toString();
+    LOG.info("Receiving and emitting tuple values: " + word);
+    if (!state.containsKey(word)) {
+      state.put(word, 1);
+    } else {
+      state.put(word, state.get(word) + 1);
+    }
+    collector.emit(input.getValues());
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(fields);
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    super.initState(state);
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java
new file mode 100644
index 0000000000..b40671faf4
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common.spout;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.integration_topology_test.core.StatefulSpout;
+
+public class StatefulABSpout extends StatefulSpout {
+
+  private static final long serialVersionUID = 7431612805823106868L;
+  private static final String[] TO_SEND = new String[]{"A", "B"};
+
+  private int emitted = 0;
+  private boolean appendSequenceId;
+
+  public StatefulABSpout() {
+    this(false);
+  }
+
+  public StatefulABSpout(boolean appendSequenceId) {
+    this.appendSequenceId = appendSequenceId;
+  }
+
+  @Override
+  public void open(Map<String, Object> conf,
+                   TopologyContext newContext,
+                   SpoutOutputCollector newCollector) {
+    super.open(conf, newContext, newCollector);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("word"));
+  }
+
+  @Override
+  public void nextTuple() {
+    String word = TO_SEND[emitted % TO_SEND.length];
+    if (appendSequenceId) {
+      word = word + "_" + emitted;
+    }
+    collector.emit(new Values(word));
+    if (!state.containsKey(word)) {
+      state.put(word, 1);
+    } else {
+      state.put(word, state.get(word) + 1);
+    }
+    emitted++;
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java
new file mode 100644
index 0000000000..2c19e11544
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.io.IOException;
+import java.text.ParseException;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+public final class HttpUtils {
+
+  private HttpUtils() { }
+
+  public static int httpJsonPost(String newHttpPostUrl, String jsonData)
+      throws IOException, ParseException {
+    HttpClient client = HttpClientBuilder.create().build();
+    HttpPost post = new HttpPost(newHttpPostUrl);
+
+    StringEntity requestEntity = new StringEntity(jsonData, ContentType.APPLICATION_JSON);
+
+    post.setEntity(requestEntity);
+    HttpResponse response = client.execute(post);
+
+    return response.getStatusLine().getStatusCode();
+  }
+
+  public static void postToHttpServer(String postUrl, String data, String dataName)
+    throws RuntimeException {
+    try {
+      int responseCode = -1;
+      for (int attempts = 0; attempts < 2; attempts++) {
+        responseCode = httpJsonPost(postUrl, data);
+        if (responseCode == 200) {
+          return;
+        }
+      }
+      throw new RuntimeException(
+          String.format("Failed to post %s to %s: %s",
+              dataName, postUrl, responseCode));
+    } catch (IOException | java.text.ParseException e) {
+      throw new RuntimeException(String.format("Posting %s to %s failed",
+          dataName, postUrl), e);
+    }
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java
new file mode 100644
index 0000000000..62bf8b9f7c
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+
+public class StatefulBolt extends BaseRichBolt implements IStatefulComponent<String, Integer> {
+
+  private static final long serialVersionUID = 5834931054885658328L;
+
+  protected State<String, Integer> state;
+  protected OutputCollector collector;
+  protected TopologyContext context;
+
+  public StatefulBolt() { }
+
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext inputContext,
+                      OutputCollector inputOutputCollector) {
+    this.context = inputContext;
+    this.collector = inputOutputCollector;
+  }
+
+  @Override
+  public void execute(Tuple input) { }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+
+  @Override
+  public void initState(State<String, Integer> inputState) {
+    this.state = inputState;
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java
new file mode 100644
index 0000000000..11084c7e1a
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+
+public class StatefulIntegrationTopologyTestBolt extends StatefulBolt {
+
+  private static final long serialVersionUID = 6657584414733837761L;
+  private static final Logger LOG =
+      Logger.getLogger(StatefulIntegrationTopologyTestBolt.class.getName());
+  private final StatefulBolt delegateBolt;
+  private boolean checkpointExisted = false;
+  private String outputLocation;
+
+  public StatefulIntegrationTopologyTestBolt(StatefulBolt delegate, String outputLocation) {
+    this.delegateBolt = delegate;
+    this.outputLocation = outputLocation;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext context,
+                      OutputCollector outputCollector) {
+
+    this.delegateBolt.prepare(map, context, outputCollector);
+
+    // send instance state to http server
+    if (!delegateBolt.state.isEmpty()) {
+      String compId = String.format("%s_%d", delegateBolt.context.getThisComponentId(),
+          delegateBolt.context.getThisTaskId());
+      String dataName = String.format("instance %s state", compId);
+      String stateJsonString = formatJson(compId, delegateBolt.state);
+
+      LOG.info(String.format("Posting %s to %s", dataName, this.outputLocation));
+      HttpUtils.postToHttpServer(this.outputLocation, stateJsonString, dataName);
+    }
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    String streamID = tuple.getSourceStreamId();
+    LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);
+    delegateBolt.execute(tuple);
+  }
+
+  @Override
+  public void cleanup() {
+    delegateBolt.cleanup();
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    delegateBolt.declareOutputFields(outputFieldsDeclarer);
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    delegateBolt.initState(state);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    if (!checkpointExisted) {
+      delegateBolt.preSave(checkpointId);
+      checkpointExisted = true;
+    } else {
+      if (delegateBolt.context.getThisTaskIndex() == 0) {
+        throw new RuntimeException("Kill instance " + context.getThisComponentId());
+      }
+    }
+  }
+
+  private String formatJson(String compId, Map<String, Integer> state) {
+    StringBuilder stateString = new StringBuilder();
+    for (Map.Entry<String, Integer> entry : state.entrySet()) {
+      if (stateString.length() != 0) {
+        stateString.append(", ");
+      }
+      stateString.append(String.format("\"%s\": %d", entry.getKey(), entry.getValue()));
+    }
+    return String.format("{\"%s\": {%s}}", compId, stateString.toString());
+  }
+
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java
new file mode 100644
index 0000000000..7f039fe15b
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+
+public class StatefulIntegrationTopologyTestSpout extends StatefulSpout {
+
+  private static final long serialVersionUID = -6920782627142720131L;
+  private static final Logger LOG = Logger
+      .getLogger(StatefulIntegrationTopologyTestSpout.class.getName());
+
+  private final StatefulSpout delegateSpout;
+  private int maxExecutions;
+  private int curExecutions;
+  private String outputLocation;
+
+  public StatefulIntegrationTopologyTestSpout(StatefulSpout delegateSpout,
+                                              int maxExecutions, String outputLocation) {
+    this.delegateSpout = delegateSpout;
+    this.maxExecutions = maxExecutions;
+    this.curExecutions = maxExecutions;
+    this.outputLocation = outputLocation;
+  }
+
+  private void resetMaxExecutions(int resetExecutions) {
+    this.curExecutions = resetExecutions;
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    delegateSpout.declareOutputFields(outputFieldsDeclarer);
+  }
+
+  @Override
+  public void close() {
+    delegateSpout.close();
+  }
+
+  @Override
+  public void activate() {
+    delegateSpout.activate();
+  }
+
+  @Override
+  public void deactivate() {
+    delegateSpout.deactivate();
+  }
+
+  @Override
+  public void open(Map<String, Object> map,
+                   TopologyContext topologyContext,
+                   SpoutOutputCollector outputCollector) {
+
+    delegateSpout.open(map, topologyContext, outputCollector);
+
+    // send instance state to http server
+    if (!delegateSpout.state.isEmpty()) {
+      String compId = String.format("%s_%d", delegateSpout.context.getThisComponentId(),
+          delegateSpout.context.getThisTaskId());
+      String dataName = String.format("instance %s state", compId);
+      String stateJsonString = formatJson(compId, delegateSpout.state);
+
+      LOG.info(String.format("Posting %s to %s", dataName, this.outputLocation));
+      HttpUtils.postToHttpServer(this.outputLocation, stateJsonString, dataName);
+    }
+  }
+
+  @Override
+  public void nextTuple() {
+    if (doneEmitting()) {
+      return;
+    }
+    curExecutions--;
+    delegateSpout.nextTuple();
+  }
+
+  protected boolean doneEmitting() {
+    return curExecutions <= 0;
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    delegateSpout.initState(state);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    delegateSpout.preSave(checkpointId);
+    resetMaxExecutions(maxExecutions);
+  }
+
+  private String formatJson(String compId, Map<String, Integer> state) {
+    StringBuilder stateString = new StringBuilder();
+    for (Map.Entry<String, Integer> entry : state.entrySet()) {
+      if (stateString.length() != 0) {
+        stateString.append(", ");
+      }
+      stateString.append(String.format("\"%s\": %d", entry.getKey(), entry.getValue()));
+    }
+    return String.format("{\"%s\": {%s}}", compId, stateString.toString());
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java
new file mode 100644
index 0000000000..8783d4115d
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+
+public class StatefulSpout extends BaseRichSpout implements IStatefulComponent<String, Integer> {
+
+  private static final long serialVersionUID = 2045875254384424423L;
+  protected SpoutOutputCollector collector;
+  protected State<String, Integer> state;
+  protected TopologyContext context;
+
+  public StatefulSpout() { }
+
+  @Override
+  public void open(Map<String, Object> conf,
+                   TopologyContext newContext,
+                   SpoutOutputCollector newCollector) {
+    this.context = newContext;
+    this.collector = newCollector;
+  }
+
+  @Override
+  public void nextTuple() { }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+
+  @Override
+  public void initState(State<String, Integer> inputState) {
+    this.state = inputState;
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java
new file mode 100644
index 0000000000..5eee2a66b7
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.HeronTopology;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.topology.BoltDeclarer;
+import org.apache.heron.api.topology.SpoutDeclarer;
+import org.apache.heron.api.topology.TopologyBuilder;
+
+public class TopologyTestTopologyBuilder extends TopologyBuilder {
+
+  private static final int DEFAULT_EXECUTION_COUNT = 10;
+  private final Map<String, Integer> components = new HashMap<>();
+  private final String outputLocation;
+
+  public TopologyTestTopologyBuilder(String outputLocation) {
+    this.outputLocation = outputLocation;
+  }
+
+  public BoltDeclarer setBolt(String id, StatefulBolt bolt, Number parallelismHint) {
+    return setBolt(id, new StatefulIntegrationTopologyTestBolt(bolt, this.outputLocation),
+        parallelismHint);
+  }
+
+  public BoltDeclarer setBolt(String id, StatefulIntegrationTopologyTestBolt bolt,
+                              Number parallelismHint) {
+    components.put(id, (Integer) parallelismHint);
+    return super.setBolt(id, bolt, parallelismHint);
+  }
+
+  public SpoutDeclarer setSpout(String id, StatefulSpout spout, Number parallelismHint) {
+    return setSpout(id, spout, parallelismHint, DEFAULT_EXECUTION_COUNT);
+  }
+
+  // A method allows user to define the maxExecutionCount of the spout
+  // To be compatible with earlier Integration Test Framework
+  public SpoutDeclarer setSpout(String id, StatefulSpout spout,
+                                Number parallelismHint, int maxExecutionCount) {
+
+    StatefulIntegrationTopologyTestSpout wrappedSpout =
+        new StatefulIntegrationTopologyTestSpout(spout, maxExecutionCount, this.outputLocation);
+
+    return setSpout(id, wrappedSpout, parallelismHint);
+  }
+
+  private SpoutDeclarer setSpout(String id, StatefulIntegrationTopologyTestSpout itSpout,
+                                 Number parallelismHint) {
+    components.put(id, (Integer) parallelismHint);
+    return super.setSpout(id, itSpout, parallelismHint);
+  }
+
+  @Override
+  public HeronTopology createTopology() {
+
+    TopologyAPI.Topology.Builder topologyBlr =
+        super.createTopology().
+            setConfig(new Config()).
+            setName("").
+            setState(TopologyAPI.TopologyState.RUNNING).
+            getTopology().toBuilder();
+
+    // Clear unnecessary fields to make the state of TopologyAPI.Topology.Builder clean
+    topologyBlr.clearTopologyConfig().clearName().clearState();
+
+    // We wrap it to the new topologyBuilder
+    return new HeronTopology(topologyBlr);
+  }
+
+  public Map<String, Integer> getComponentParallelism() {
+    return components;
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java
new file mode 100644
index 0000000000..0e1d901603
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.topology.stateful_basic_topology_one_task;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+import org.apache.heron.integration_topology_test.common.bolt.StatefulIdentityBolt;
+import org.apache.heron.integration_topology_test.common.spout.StatefulABSpout;
+import org.apache.heron.integration_topology_test.core.TopologyTestTopologyBuilder;
+
+public final class StatefulBasicTopologyOneTask extends AbstractTestTopology {
+
+  private StatefulBasicTopologyOneTask(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TopologyTestTopologyBuilder buildStatefulTopology(TopologyTestTopologyBuilder builder) {
+    builder.setSpout("stateful-ab-spout", new StatefulABSpout(true), 1);
+    builder.setBolt("stateful-identity-bolt",
+        new StatefulIdentityBolt(new Fields("word")), 3)
+        .shuffleGrouping("stateful-ab-spout");
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    StatefulBasicTopologyOneTask topology = new StatefulBasicTopologyOneTask(args);
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json
new file mode 100644
index 0000000000..1d111dfbe9
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json
@@ -0,0 +1,4 @@
+[{"stateful-ab-spout": {"A_6": 1, "A_4": 1, "A_2": 1, "A_0": 1, "A_8": 1, "B_9": 1, "B_1": 1, "B_3": 1, "B_5": 1, "B_7": 1}},
+  {"stateful-identity-bolt": {"B_5": 1, "A_2": 1, "A_8": 1}},
+  {"stateful-identity-bolt": {"B_1": 1, "A_4": 1, "B_7": 1}},
+  {"stateful-identity-bolt": {"A_6": 1, "B_3": 1, "A_0": 1, "B_9": 1}}]
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json
new file mode 100644
index 0000000000..691ea87761
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json
@@ -0,0 +1,65 @@
+{
+  "topology": {
+    "id": "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+    "name": "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+    "spouts": {
+      "comp": {
+        "name": "stateful-ab-spout",
+        "config": {
+          "kvs": {
+            "key": "topology.component.parallelism",
+            "value": "1"
+          }
+        }
+      }
+    },
+    "bolts": [
+      {
+        "comp": {
+          "name": "stateful-identity-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "3"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "stateful-ab-spout"
+            }
+          }
+        ]
+      }
+    ],
+    "state": "RUNNING"
+  },
+  "instances": [
+    {
+      "instance_id": "container_1_stateful-ab-spout_1",
+      "info": {
+        "component_name": "stateful-ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_1_stateful-identity-bolt_2",
+      "info": {
+        "component_name": "stateful-identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_stateful-identity-bolt_3",
+      "info": {
+        "component_name": "stateful-identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_stateful-identity-bolt_4",
+      "info": {
+        "component_name": "stateful-identity-bolt"
+      }
+    }
+  ]
+}
diff --git a/integration_test/src/python/http_server/main.py b/integration_test/src/python/http_server/main.py
index 262de3de73..72dc003e73 100644
--- a/integration_test/src/python/http_server/main.py
+++ b/integration_test/src/python/http_server/main.py
@@ -71,6 +71,32 @@ def post(self, key):
     self.state_map[key] = tornado.escape.json_encode(data)
     self.write("Results written to " + tornado.escape.json_encode(self.state_map) + " successfully")
 
+# for instance states in stateful processing
+class StateResultHandler(tornado.web.RequestHandler):
+  def initialize(self, result_map):
+    self.result_map = result_map
+
+  def get(self, key):
+    if key:
+      self.set_header("Content-Type", 'application/json; charset="utf-8"')
+      if key in self.result_map:
+        self.write(tornado.escape.json_encode(self.result_map[key]))
+      else:
+        raise tornado.web.HTTPError(status_code=404, log_message="Key %s not found" % key)
+    else:
+      self.write(tornado.escape.json_encode(self.result_map))
+
+  def post(self, key):
+    data = tornado.escape.json_decode(self.request.body)
+    if key:
+      if key in self.result_map:
+        self.result_map[key].append(data)
+      else:
+        self.result_map[key] = [data]
+      self.write("Results written successfully: topology " + key + ' instance ' + data.keys()[0])
+    else:
+      raise tornado.web.HTTPError(status_code=404, log_message="Invalid key %s" % key)
+
 def main():
   '''
   Runs a tornado http server that listens for any
@@ -83,11 +109,14 @@ def main():
     os.makedirs(RESULTS_DIRECTORY)
 
   state_map = {}
+  # for instance states in stateful processing
+  state_result_map = {}
   application = tornado.web.Application([
       (r"/", MainHandler),
       (r"^/results/([a-zA-Z0-9_-]+$)", FileHandler),
       (r"^/state", MemoryMapGetAllHandler, dict(state_map=state_map)),
       (r"^/state/([a-zA-Z0-9_-]+$)", MemoryMapHandler, dict(state_map=state_map)),
+      (r"^/stateResults/([a-zA-Z0-9_-]+$)", StateResultHandler, dict(result_map=state_result_map)),
   ])
 
   if len(sys.argv) == 1:
diff --git a/integration_test/src/python/topology_test_runner/main.py b/integration_test/src/python/topology_test_runner/main.py
index ec948cc7be..1ed4cf8a12 100644
--- a/integration_test/src/python/topology_test_runner/main.py
+++ b/integration_test/src/python/topology_test_runner/main.py
@@ -8,6 +8,7 @@
 import sys
 import time
 import uuid
+from httplib import HTTPConnection
 
 from ..common import status
 from heron.common.src.python.utils import log
@@ -160,16 +161,48 @@ def check_results(self):
       raise status.TestFailure("The actual topology graph structure does not match the expected one"
                                + " for topology: %s" % self.topology_name)
     # check instance states, get the instance_state_check_result
-    # if both above are isinstanc(status.TestSuccess), return success, else return fail
+    # if both above are isinstance(status.TestSuccess), return success, else return fail
+    expected_result = self.instance_state_expected_result_handler.fetch_results()
 
-  def _compare_state(self, expected_results, actual_results):
-    pass
+    decoder = json.JSONDecoder(strict=False)
+    expected_result = decoder.decode(expected_result)
+    
+    actual_result =[]
+    for _ in range(0, RETRY_ATTEMPTS):
+      actual_result = self.instance_state_actual_result_handler.fetch_results()
+      actual_result = decoder.decode(actual_result)
+      if len(actual_result) == len(expected_result):
+        break
+      else:
+        time.sleep(RETRY_INTERVAL)
+    else:
+      raise status.TestFailure("Fail to get actual results of instance states for topology %s"
+                               % self.topology_name)
 
-  def _parse_state_expected_results(self, expected_results):
-    pass
+    if '_' not in expected_result[0].keys()[0]:
+      actual_result = self._parse_instance_id(actual_result)
 
-  def _parse_state_actual_results(self, actual_results):
-    pass
+    return self._compare_state(sorted(expected_result), sorted(actual_result))
+
+  def _compare_state(self, expected_results, actual_results):
+    if actual_results == expected_results:
+      return status.TestSuccess("Topology %s instance state result matches expected result"
+                                % self.topology_name)
+    else:
+      failure = status.TestFailure("Actual result did not match expected result")
+      # lambda required below to remove the unicode 'u' from the output
+      logging.info("Actual result ---------- \n" + str(map(lambda x: str(x), actual_results)))
+      logging.info("Expected result ---------- \n" + str(map(lambda x: str(x), expected_results)))
+      raise failure
+
+  def _parse_instance_id(self, input):
+    # remove taskId in instaneId
+    output = list()
+    for ele in input:
+      for key in ele:
+        new_key = key.split('_')[0]
+        output.append({new_key: dict(ele[key])})
+    return output
 
 
 class FileBasedExpectedResultsHandler(object):
@@ -247,14 +280,44 @@ class HttpBasedActualResultsHandler(object):
   Get actually loaded instance states
   TODO(yaoli): complete this class when stateful processing is ready
   """
-  pass
+  def __init__(self, server_host_port, topology_name):
+    self.server_host_port = server_host_port
+    self.topology_name = topology_name
 
-class StatefulStorageBasedExpectedResultsHandler(object):
-  """
-  Get expected instance states from checkpoint storage
-  TODO(yaoli): complete this class when stateful processing is ready
-  """
-  pass
+  def fetch_results(self):
+    try:
+      return self.fetch_from_server(self.server_host_port, self.topology_name,
+        'instance_state', '/stateResults/%s' % self.topology_name)
+    except Exception as e:
+      raise status.TestFailure("Fetching instance state failed for %s topology" % self.topology_name, e)
+
+  def fetch_from_server(self, server_host_port, topology_name, data_name, path):
+    ''' Make a http get request to fetch actual results from http server '''
+    for i in range(0, RETRY_ATTEMPTS):
+      logging.info("Fetching %s for topology %s, retry count: %d", data_name, topology_name, i)
+      response = self.get_http_response(server_host_port, path)
+      if response.status == 200:
+        return response.read()
+      elif i != RETRY_ATTEMPTS:
+        logging.info("Fetching %s failed with status: %s; reason: %s; body: %s",
+          data_name, response.status, response.reason, response.read())
+        time.sleep(RETRY_INTERVAL)
+
+    raise status.TestFailure("Failed to fetch %s after %d attempts" % (data_name, RETRY_ATTEMPTS))
+
+  def get_http_response(self, server_host_port, path):
+    ''' get HTTP response '''
+    for _ in range(0, RETRY_ATTEMPTS):
+      try:
+        connection = HTTPConnection(server_host_port)
+        connection.request('GET', path)
+        response = connection.getresponse()
+        return response
+      except Exception:
+        time.sleep(RETRY_INTERVAL)
+        continue
+
+    raise status.TestFailure("Failed to get HTTP Response after %d attempts" % RETRY_ATTEMPTS)
 
 #  Result handlers end
 
@@ -273,10 +336,14 @@ def filter_test_topologies(test_topologies, test_pattern):
 
 
 def run_topology_test(topology_name, classpath, results_checker,
-  params, update_args, deactivate_args, restart_args, extra_topology_args):
+  params, update_args, deactivate_args, restart_args, http_server_host_port, extra_topology_args,
+  check_type):
   try:
-    args = "-t %s %s" % \
-           (topology_name, extra_topology_args)
+    if check_type == 'checkpoint_state':
+      args = "-r http://%s/stateResults -t %s %s" % \
+             (http_server_host_port, topology_name, extra_topology_args)
+    else:
+      args = "-t %s %s" % (topology_name, extra_topology_args)
     submit_topology(params.heron_cli_path, params.cli_config_path, params.cluster, params.role,
       params.env, params.tests_bin_path, classpath,
       params.release_package_uri, args)
@@ -413,7 +480,7 @@ def run_topology_tests(conf, args):
   failures = []
   timestamp = time.strftime('%Y%m%d%H%M%S')
 
-  # http_host_port = "%s:%d" % (args.http_hostname, args.http_port)
+  http_server_host_port = "%s:%d" % (args.http_hostname, args.http_port)
 
   if args.tests_bin_path.endswith("scala-integration-tests.jar"):
     test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern)
@@ -437,7 +504,7 @@ def run_topology_tests(conf, args):
     update_args = ""
     deactivate_args = ""
     restart_args = ""
-    topology_args = ''
+    topology_args = ""
     if "updateArgs" in topology_conf:
       update_args = topology_conf["updateArgs"]
     if "deactivateArgs" in topology_conf:
@@ -448,22 +515,26 @@ def run_topology_tests(conf, args):
     if "topologyArgs" in topology_conf:
       topology_args = "%s %s" % (topology_args, topology_conf["topologyArgs"])
 
-    expected_result_file_path = \
-      args.topologies_path + "/" + topology_conf["expectedResultRelativePath"]
+    expected_topo_result_file_path = \
+      args.topologies_path + "/" + topology_conf["expectedTopoResultRelativePath"]
+    if "expectedStateResultRelativePath" in topology_conf:
+      expected_state_result_file_path = \
+        args.topologies_path + "/" + topology_conf["expectedStateResultRelativePath"]
+
     check_type = topology_conf["checkType"]
     if check_type == 'topology_structure':
       results_checker = load_result_checker(
         check_type, topology_name,
-        FileBasedExpectedResultsHandler(expected_result_file_path),
+        FileBasedExpectedResultsHandler(expected_topo_result_file_path),
         ZkFileBasedActualResultsHandler(topology_name, args.cluster))
     elif check_type == 'checkpoint_state':
       if processing_type == 'stateful':
         results_checker = load_result_checker(
           check_type, topology_name,
-          FileBasedExpectedResultsHandler(expected_result_file_path),
+          FileBasedExpectedResultsHandler(expected_topo_result_file_path),
           ZkFileBasedActualResultsHandler(topology_name, args.cluster),
-          StatefulStorageBasedExpectedResultsHandler(),
-          HttpBasedActualResultsHandler())
+          FileBasedExpectedResultsHandler(expected_state_result_file_path),
+          HttpBasedActualResultsHandler(http_server_host_port, topology_name))
       elif processing_type == 'non_stateful':
         raise ValueError("Cannot check instance checkpoint state in non_stateful processing. "
                        + "Not running topology: " + topology_name)
@@ -477,7 +548,8 @@ def run_topology_tests(conf, args):
     start_secs = int(time.time())
     try:
       result = run_topology_test(topology_name, classpath, results_checker,
-        args, update_args, deactivate_args, restart_args, topology_args)
+        args, update_args, deactivate_args, restart_args, http_server_host_port, topology_args,
+        check_type)
       test_tuple = (topology_name, int(time.time()) - start_secs)
       if isinstance(result, status.TestSuccess):
         successes += [test_tuple]
diff --git a/integration_test/src/python/topology_test_runner/resources/test.json b/integration_test/src/python/topology_test_runner/resources/test.json
index 0119a48111..3682aa12b7 100644
--- a/integration_test/src/python/topology_test_runner/resources/test.json
+++ b/integration_test/src/python/topology_test_runner/resources/test.json
@@ -4,49 +4,56 @@
   "role" : "dummy_role",
   "env" : "dummy_env",
   "cliConfigPath" : "$HOME/.heron/conf",
-  "processingType" : "non_stateful",
+  "processingType" : "stateful",
   "topologyClasspathPrefix" : "org.apache.heron.integration_topology_test.topology.",
   "javaTopologies": [
     {
       "topologyName" : "IntegrationTopologyTest_FieldsGrouping",
       "classPath"    : "fields_grouping.FieldsGrouping",
-      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUp",
       "classPath"    : "basic_topology_one_task_scale_up.BasicTopologyOneTask",
       "updateArgs"   : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:3",
-      "expectedResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
+      "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleDown",
       "classPath"    : "basic_topology_one_task_scale_down.BasicTopologyOneTask",
       "updateArgs"   : "--component-parallelism=ab-spout:1 --component-parallelism=identity-bolt:2",
-      "expectedResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
+      "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUpDown",
       "classPath"    : "basic_topology_one_task_scale_up_down.BasicTopologyOneTask",
       "updateArgs"   : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:2",
-      "expectedResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
+      "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_FieldsGroupingDeactivate",
       "classPath"    : "fields_grouping.FieldsGrouping",
       "deactivateArgs"  : "True",
-      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_FieldsGroupingKillContainer",
       "classPath"    : "fields_grouping.FieldsGrouping",
       "restartArgs"  : "True",
-      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
       "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+      "classPath"    : "stateful_basic_topology_one_task.StatefulBasicTopologyOneTask",
+      "expectedTopoResultRelativePath" : "stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json",
+      "expectedStateResultRelativePath" : "stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json",
+      "checkType" : "checkpoint_state"
     }
   ]
 }
diff --git a/scripts/run_integration_topology_test.sh b/scripts/run_integration_topology_test.sh
index cfa81b30a2..b56eda57ee 100644
--- a/scripts/run_integration_topology_test.sh
+++ b/scripts/run_integration_topology_test.sh
@@ -3,6 +3,7 @@
 # Script to locally run the integration topology test.
 #
 
+HTTP_SERVER="./bazel-bin/integration_test/src/python/http_server/http-server"
 TEST_RUNNER="./bazel-bin/integration_test/src/python/topology_test_runner/topology-test-runner.pex"
 
 JAVA_TESTS_DIR="integration_test/src/java/org/apache/heron/integration_topology_test/topology"
@@ -25,9 +26,9 @@ bazel run --config=`platform` -- scripts/packages:heron-install.sh --user
 bazel build --config=`platform` {heron/...,scripts/packages:tarpkgs,integration_test/src/...}
 
 # run the simple http server
-#${HTTP_SERVER} 8080 &
-#http_server_id=$!
-#trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+${HTTP_SERVER} 8080 &
+http_server_id=$!
+trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
 
 # run the scala integration tests
 #${TEST_RUNNER} \


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services