You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by hu...@apache.org on 2018/07/14 00:09:59 UTC

[incubator-heron] branch master updated: add integration test for instance state in stateful processing (#2956)

This is an automated email from the ASF dual-hosted git repository.

huijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 279fdec  add integration test for instance state in stateful processing (#2956)
279fdec is described below

commit 279fdec164f04f4edcdf418de4bbb05f3890ef22
Author: Yao Li <cl...@gmail.com>
AuthorDate: Fri Jul 13 17:09:56 2018 -0700

    add integration test for instance state in stateful processing (#2956)
    
    * add integration test for instance state in stateful processing
    
    * merge http servers
    
    * remove the I prefix
---
 integration_test/src/java/BUILD                    |  26 ++++-
 .../common/AbstractTestTopology.java               |  43 ++++++-
 .../common/bolt/StatefulIdentityBolt.java          |  79 +++++++++++++
 .../common/spout/StatefulABSpout.java              |  73 ++++++++++++
 .../integration_topology_test/core/HttpUtils.java  |  66 +++++++++++
 .../core/StatefulBolt.java                         |  63 +++++++++++
 .../core/StatefulIntegrationTopologyTestBolt.java  | 108 ++++++++++++++++++
 .../core/StatefulIntegrationTopologyTestSpout.java | 125 +++++++++++++++++++++
 .../core/StatefulSpout.java                        |  61 ++++++++++
 .../core/TopologyTestTopologyBuilder.java          |  93 +++++++++++++++
 .../StatefulBasicTopologyOneTask.java              |  48 ++++++++
 .../StatefulBasicTopologyOneTaskState.json         |   4 +
 .../StatefulBasicTopologyOneTaskTopo.json          |  65 +++++++++++
 integration_test/src/python/http_server/main.py    |  29 +++++
 .../src/python/topology_test_runner/main.py        | 124 +++++++++++++++-----
 .../topology_test_runner/resources/test.json       |  21 ++--
 scripts/run_integration_topology_test.sh           |   7 +-
 17 files changed, 991 insertions(+), 44 deletions(-)

diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index b2a7c8f..afbdea3 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -120,6 +120,23 @@ genrule(
     cmd = "cp $< $@",
 )
 
+java_library(
+    name = "core-topology",
+    srcs = glob(
+        ["org/apache/heron/integration_topology_test/core/**/*.java"],
+    ),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
+        "//storm-compatibility/src/java:storm-compatibility-java",
+        "//heron/proto:proto_topology_java",
+        "//third_party/java:jackson",
+        "@commons_logging_commons_logging//jar",
+        "@com_google_protobuf//:protobuf_java",
+        "@org_apache_httpcomponents_http_client//jar",
+        "@org_apache_httpcomponents_http_core//jar",
+    ],
+)
 
 java_library(
     name = "common_topology_test",
@@ -133,7 +150,8 @@ java_library(
         "//third_party/java:hadoop-core",
         "//third_party/java:jackson",
         "@commons_cli_commons_cli//jar",
-        ":core"
+        ":core",
+        ":core-topology"
     ],
 )
 
@@ -150,7 +168,8 @@ java_library(
         "@commons_cli_commons_cli//jar",
         ":common_topology_test",
         ":common",
-        ":core"
+        ":core",
+        ":core-topology"
     ],
 )
 
@@ -167,7 +186,8 @@ java_binary(
         "@com_googlecode_json_simple_json_simple//jar",
         ":common_topology_test",
         ":common",
-        ":core"
+        ":core",
+        ":core-topology"
     ],
 )
 
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
index 97ce86e..5a9364e 100644
--- a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
@@ -33,17 +33,20 @@ import org.apache.heron.api.HeronSubmitter;
 import org.apache.heron.api.exception.AlreadyAliveException;
 import org.apache.heron.api.exception.InvalidTopologyException;
 import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.integration_topology_test.core.TopologyTestTopologyBuilder;
 
 /**
  * Class to abstract out the common parts of the test framework for submitting topologies.
  * Subclasses can implement {@code buildTopology} and call {@code submit}.
  */
 public abstract class AbstractTestTopology {
+  private static final Integer CHECKPOINT_INTERVAL = 10;
   private static final String TOPOLOGY_OPTION = "topology_name";
-  private static final String STATE_UPDATE_TOKEN = "state_server_update_token";
+  private static final String RESULTS_URL_OPTION = "results_url";
 
   private final CommandLine cmd;
   private final String topologyName;
+  private final String httpServerResultsUrl;
 
   protected AbstractTestTopology(String[] args) throws MalformedURLException {
     CommandLineParser parser = new DefaultParser();
@@ -59,9 +62,21 @@ public abstract class AbstractTestTopology {
     }
 
     this.topologyName = cmd.getOptionValue(TOPOLOGY_OPTION);
+    if (cmd.getOptionValue(RESULTS_URL_OPTION) != null) {
+      this.httpServerResultsUrl =
+          pathAppend(cmd.getOptionValue(RESULTS_URL_OPTION), this.topologyName);
+    } else {
+      this.httpServerResultsUrl = null;
+    }
   }
 
-  protected abstract TopologyBuilder buildTopology(TopologyBuilder builder);
+  protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+    return builder;
+  }
+  protected TopologyTestTopologyBuilder buildStatefulTopology(TopologyTestTopologyBuilder
+                                                                           builder) {
+    return builder;
+  }
 
   protected Config buildConfig(Config config) {
     return config;
@@ -74,6 +89,11 @@ public abstract class AbstractTestTopology {
     topologyNameOption.setRequired(true);
     options.addOption(topologyNameOption);
 
+    Option resultsUrlOption =
+        new Option("r", RESULTS_URL_OPTION, true, "url to post and get instance state");
+    resultsUrlOption.setRequired(false);
+    options.addOption(resultsUrlOption);
+
     return options;
   }
 
@@ -82,12 +102,25 @@ public abstract class AbstractTestTopology {
   }
 
   public final void submit(Config userConf) throws AlreadyAliveException, InvalidTopologyException {
-    TopologyBuilder builder = new TopologyBuilder();
-
     Config conf = buildConfig(new BasicConfig());
     if (userConf != null) {
       conf.putAll(userConf);
     }
-    HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+
+    if (this.httpServerResultsUrl == null) {
+      TopologyBuilder builder = new TopologyBuilder();
+      HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+
+    } else {
+      TopologyTestTopologyBuilder builder = new TopologyTestTopologyBuilder(httpServerResultsUrl);
+      conf.setTopologyReliabilityMode(Config.TopologyReliabilityMode.EFFECTIVELY_ONCE);
+      conf.setTopologyStatefulCheckpointIntervalSecs(CHECKPOINT_INTERVAL);
+      HeronSubmitter.submitTopology(topologyName, conf,
+          buildStatefulTopology(builder).createTopology());
+    }
+  }
+
+  private static String pathAppend(String url, String path) {
+    return String.format("%s/%s", url, path);
   }
 }
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java
new file mode 100644
index 0000000..821959f
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/bolt/StatefulIdentityBolt.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common.bolt;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.heron.integration_topology_test.core.StatefulBolt;
+
+public class StatefulIdentityBolt extends StatefulBolt {
+  private static final long serialVersionUID = 1174496806359441609L;
+  private static final Logger LOG = Logger.getLogger(StatefulIdentityBolt.class.getName());
+  private Fields fields;
+
+  public StatefulIdentityBolt() { }
+
+  public StatefulIdentityBolt(Fields fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext context,
+                      OutputCollector outputCollector) {
+    super.prepare(map, context, outputCollector);
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    StringBuilder sb = new StringBuilder();
+    for (Object o : input.getValues()) {
+      if (sb.length() > 0) {
+        sb.append(",");
+      }
+      sb.append(o.toString());
+    }
+    String word = sb.toString();
+    LOG.info("Receiving and emitting tuple values: " + word);
+    if (!state.containsKey(word)) {
+      state.put(word, 1);
+    } else {
+      state.put(word, state.get(word) + 1);
+    }
+    collector.emit(input.getValues());
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(fields);
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    super.initState(state);
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java
new file mode 100644
index 0000000..b40671f
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/spout/StatefulABSpout.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common.spout;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.api.tuple.Values;
+import org.apache.heron.integration_topology_test.core.StatefulSpout;
+
+public class StatefulABSpout extends StatefulSpout {
+
+  private static final long serialVersionUID = 7431612805823106868L;
+  private static final String[] TO_SEND = new String[]{"A", "B"};
+
+  private int emitted = 0;
+  private boolean appendSequenceId;
+
+  public StatefulABSpout() {
+    this(false);
+  }
+
+  public StatefulABSpout(boolean appendSequenceId) {
+    this.appendSequenceId = appendSequenceId;
+  }
+
+  @Override
+  public void open(Map<String, Object> conf,
+                   TopologyContext newContext,
+                   SpoutOutputCollector newCollector) {
+    super.open(conf, newContext, newCollector);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("word"));
+  }
+
+  @Override
+  public void nextTuple() {
+    String word = TO_SEND[emitted % TO_SEND.length];
+    if (appendSequenceId) {
+      word = word + "_" + emitted;
+    }
+    collector.emit(new Values(word));
+    if (!state.containsKey(word)) {
+      state.put(word, 1);
+    } else {
+      state.put(word, state.get(word) + 1);
+    }
+    emitted++;
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java
new file mode 100644
index 0000000..2c19e11
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/HttpUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.io.IOException;
+import java.text.ParseException;
+
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+
+public final class HttpUtils {
+
+  private HttpUtils() { }
+
+  public static int httpJsonPost(String newHttpPostUrl, String jsonData)
+      throws IOException, ParseException {
+    HttpClient client = HttpClientBuilder.create().build();
+    HttpPost post = new HttpPost(newHttpPostUrl);
+
+    StringEntity requestEntity = new StringEntity(jsonData, ContentType.APPLICATION_JSON);
+
+    post.setEntity(requestEntity);
+    HttpResponse response = client.execute(post);
+
+    return response.getStatusLine().getStatusCode();
+  }
+
+  public static void postToHttpServer(String postUrl, String data, String dataName)
+    throws RuntimeException {
+    try {
+      int responseCode = -1;
+      for (int attempts = 0; attempts < 2; attempts++) {
+        responseCode = httpJsonPost(postUrl, data);
+        if (responseCode == 200) {
+          return;
+        }
+      }
+      throw new RuntimeException(
+          String.format("Failed to post %s to %s: %s",
+              dataName, postUrl, responseCode));
+    } catch (IOException | java.text.ParseException e) {
+      throw new RuntimeException(String.format("Posting %s to %s failed",
+          dataName, postUrl), e);
+    }
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java
new file mode 100644
index 0000000..62bf8b9
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulBolt.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+
+public class StatefulBolt extends BaseRichBolt implements IStatefulComponent<String, Integer> {
+
+  private static final long serialVersionUID = 5834931054885658328L;
+
+  protected State<String, Integer> state;
+  protected OutputCollector collector;
+  protected TopologyContext context;
+
+  public StatefulBolt() { }
+
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext inputContext,
+                      OutputCollector inputOutputCollector) {
+    this.context = inputContext;
+    this.collector = inputOutputCollector;
+  }
+
+  @Override
+  public void execute(Tuple input) { }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+
+  @Override
+  public void initState(State<String, Integer> inputState) {
+    this.state = inputState;
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java
new file mode 100644
index 0000000..11084c7
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestBolt.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+
+public class StatefulIntegrationTopologyTestBolt extends StatefulBolt {
+
+  private static final long serialVersionUID = 6657584414733837761L;
+  private static final Logger LOG =
+      Logger.getLogger(StatefulIntegrationTopologyTestBolt.class.getName());
+  private final StatefulBolt delegateBolt;
+  private boolean checkpointExisted = false;
+  private String outputLocation;
+
+  public StatefulIntegrationTopologyTestBolt(StatefulBolt delegate, String outputLocation) {
+    this.delegateBolt = delegate;
+    this.outputLocation = outputLocation;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> map,
+                      TopologyContext context,
+                      OutputCollector outputCollector) {
+
+    this.delegateBolt.prepare(map, context, outputCollector);
+
+    // send instance state to http server
+    if (!delegateBolt.state.isEmpty()) {
+      String compId = String.format("%s_%d", delegateBolt.context.getThisComponentId(),
+          delegateBolt.context.getThisTaskId());
+      String dataName = String.format("instance %s state", compId);
+      String stateJsonString = formatJson(compId, delegateBolt.state);
+
+      LOG.info(String.format("Posting %s to %s", dataName, this.outputLocation));
+      HttpUtils.postToHttpServer(this.outputLocation, stateJsonString, dataName);
+    }
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    String streamID = tuple.getSourceStreamId();
+    LOG.info("Received a tuple: " + tuple + " ; from: " + streamID);
+    delegateBolt.execute(tuple);
+  }
+
+  @Override
+  public void cleanup() {
+    delegateBolt.cleanup();
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    delegateBolt.declareOutputFields(outputFieldsDeclarer);
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    delegateBolt.initState(state);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    if (!checkpointExisted) {
+      delegateBolt.preSave(checkpointId);
+      checkpointExisted = true;
+    } else {
+      if (delegateBolt.context.getThisTaskIndex() == 0) {
+        throw new RuntimeException("Kill instance " + context.getThisComponentId());
+      }
+    }
+  }
+
+  private String formatJson(String compId, Map<String, Integer> state) {
+    StringBuilder stateString = new StringBuilder();
+    for (Map.Entry<String, Integer> entry : state.entrySet()) {
+      if (stateString.length() != 0) {
+        stateString.append(", ");
+      }
+      stateString.append(String.format("\"%s\": %d", entry.getKey(), entry.getValue()));
+    }
+    return String.format("{\"%s\": {%s}}", compId, stateString.toString());
+  }
+
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java
new file mode 100644
index 0000000..7f039fe
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulIntegrationTopologyTestSpout.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+
+public class StatefulIntegrationTopologyTestSpout extends StatefulSpout {
+
+  private static final long serialVersionUID = -6920782627142720131L;
+  private static final Logger LOG = Logger
+      .getLogger(StatefulIntegrationTopologyTestSpout.class.getName());
+
+  private final StatefulSpout delegateSpout;
+  private int maxExecutions;
+  private int curExecutions;
+  private String outputLocation;
+
+  public StatefulIntegrationTopologyTestSpout(StatefulSpout delegateSpout,
+                                              int maxExecutions, String outputLocation) {
+    this.delegateSpout = delegateSpout;
+    this.maxExecutions = maxExecutions;
+    this.curExecutions = maxExecutions;
+    this.outputLocation = outputLocation;
+  }
+
+  private void resetMaxExecutions(int resetExecutions) {
+    this.curExecutions = resetExecutions;
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+    delegateSpout.declareOutputFields(outputFieldsDeclarer);
+  }
+
+  @Override
+  public void close() {
+    delegateSpout.close();
+  }
+
+  @Override
+  public void activate() {
+    delegateSpout.activate();
+  }
+
+  @Override
+  public void deactivate() {
+    delegateSpout.deactivate();
+  }
+
+  @Override
+  public void open(Map<String, Object> map,
+                   TopologyContext topologyContext,
+                   SpoutOutputCollector outputCollector) {
+
+    delegateSpout.open(map, topologyContext, outputCollector);
+
+    // send instance state to http server
+    if (!delegateSpout.state.isEmpty()) {
+      String compId = String.format("%s_%d", delegateSpout.context.getThisComponentId(),
+          delegateSpout.context.getThisTaskId());
+      String dataName = String.format("instance %s state", compId);
+      String stateJsonString = formatJson(compId, delegateSpout.state);
+
+      LOG.info(String.format("Posting %s to %s", dataName, this.outputLocation));
+      HttpUtils.postToHttpServer(this.outputLocation, stateJsonString, dataName);
+    }
+  }
+
+  @Override
+  public void nextTuple() {
+    if (doneEmitting()) {
+      return;
+    }
+    curExecutions--;
+    delegateSpout.nextTuple();
+  }
+
+  protected boolean doneEmitting() {
+    return curExecutions <= 0;
+  }
+
+  @Override
+  public void initState(State<String, Integer> state) {
+    delegateSpout.initState(state);
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+    delegateSpout.preSave(checkpointId);
+    resetMaxExecutions(maxExecutions);
+  }
+
+  private String formatJson(String compId, Map<String, Integer> state) {
+    StringBuilder stateString = new StringBuilder();
+    for (Map.Entry<String, Integer> entry : state.entrySet()) {
+      if (stateString.length() != 0) {
+        stateString.append(", ");
+      }
+      stateString.append(String.format("\"%s\": %d", entry.getKey(), entry.getValue()));
+    }
+    return String.format("{\"%s\": {%s}}", compId, stateString.toString());
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java
new file mode 100644
index 0000000..8783d41
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/StatefulSpout.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.Map;
+
+import org.apache.heron.api.spout.BaseRichSpout;
+import org.apache.heron.api.spout.SpoutOutputCollector;
+import org.apache.heron.api.state.State;
+import org.apache.heron.api.topology.IStatefulComponent;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+
+public class StatefulSpout extends BaseRichSpout implements IStatefulComponent<String, Integer> {
+
+  private static final long serialVersionUID = 2045875254384424423L;
+  protected SpoutOutputCollector collector;
+  protected State<String, Integer> state;
+  protected TopologyContext context;
+
+  public StatefulSpout() { }
+
+  @Override
+  public void open(Map<String, Object> conf,
+                   TopologyContext newContext,
+                   SpoutOutputCollector newCollector) {
+    this.context = newContext;
+    this.collector = newCollector;
+  }
+
+  @Override
+  public void nextTuple() { }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) { }
+
+  @Override
+  public void initState(State<String, Integer> inputState) {
+    this.state = inputState;
+  }
+
+  @Override
+  public void preSave(String checkpointId) {
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java b/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java
new file mode 100644
index 0000000..5eee2a6
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/core/TopologyTestTopologyBuilder.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.core;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.HeronTopology;
+import org.apache.heron.api.generated.TopologyAPI;
+import org.apache.heron.api.topology.BoltDeclarer;
+import org.apache.heron.api.topology.SpoutDeclarer;
+import org.apache.heron.api.topology.TopologyBuilder;
+
+public class TopologyTestTopologyBuilder extends TopologyBuilder {
+
+  private static final int DEFAULT_EXECUTION_COUNT = 10;
+  private final Map<String, Integer> components = new HashMap<>();
+  private final String outputLocation;
+
+  public TopologyTestTopologyBuilder(String outputLocation) {
+    this.outputLocation = outputLocation;
+  }
+
+  public BoltDeclarer setBolt(String id, StatefulBolt bolt, Number parallelismHint) {
+    return setBolt(id, new StatefulIntegrationTopologyTestBolt(bolt, this.outputLocation),
+        parallelismHint);
+  }
+
+  public BoltDeclarer setBolt(String id, StatefulIntegrationTopologyTestBolt bolt,
+                              Number parallelismHint) {
+    components.put(id, (Integer) parallelismHint);
+    return super.setBolt(id, bolt, parallelismHint);
+  }
+
+  public SpoutDeclarer setSpout(String id, StatefulSpout spout, Number parallelismHint) {
+    return setSpout(id, spout, parallelismHint, DEFAULT_EXECUTION_COUNT);
+  }
+
+  // A method allows user to define the maxExecutionCount of the spout
+  // To be compatible with earlier Integration Test Framework
+  public SpoutDeclarer setSpout(String id, StatefulSpout spout,
+                                Number parallelismHint, int maxExecutionCount) {
+
+    StatefulIntegrationTopologyTestSpout wrappedSpout =
+        new StatefulIntegrationTopologyTestSpout(spout, maxExecutionCount, this.outputLocation);
+
+    return setSpout(id, wrappedSpout, parallelismHint);
+  }
+
+  private SpoutDeclarer setSpout(String id, StatefulIntegrationTopologyTestSpout itSpout,
+                                 Number parallelismHint) {
+    components.put(id, (Integer) parallelismHint);
+    return super.setSpout(id, itSpout, parallelismHint);
+  }
+
+  @Override
+  public HeronTopology createTopology() {
+
+    TopologyAPI.Topology.Builder topologyBlr =
+        super.createTopology().
+            setConfig(new Config()).
+            setName("").
+            setState(TopologyAPI.TopologyState.RUNNING).
+            getTopology().toBuilder();
+
+    // Clear unnecessary fields to make the state of TopologyAPI.Topology.Builder clean
+    topologyBlr.clearTopologyConfig().clearName().clearState();
+
+    // We wrap it to the new topologyBuilder
+    return new HeronTopology(topologyBlr);
+  }
+
+  public Map<String, Integer> getComponentParallelism() {
+    return components;
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java
new file mode 100644
index 0000000..0e1d901
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTask.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.heron.integration_topology_test.topology.stateful_basic_topology_one_task;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+import org.apache.heron.integration_topology_test.common.bolt.StatefulIdentityBolt;
+import org.apache.heron.integration_topology_test.common.spout.StatefulABSpout;
+import org.apache.heron.integration_topology_test.core.TopologyTestTopologyBuilder;
+
+public final class StatefulBasicTopologyOneTask extends AbstractTestTopology {
+
+  private StatefulBasicTopologyOneTask(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TopologyTestTopologyBuilder buildStatefulTopology(TopologyTestTopologyBuilder builder) {
+    builder.setSpout("stateful-ab-spout", new StatefulABSpout(true), 1);
+    builder.setBolt("stateful-identity-bolt",
+        new StatefulIdentityBolt(new Fields("word")), 3)
+        .shuffleGrouping("stateful-ab-spout");
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    StatefulBasicTopologyOneTask topology = new StatefulBasicTopologyOneTask(args);
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json
new file mode 100644
index 0000000..1d111df
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json
@@ -0,0 +1,4 @@
+[{"stateful-ab-spout": {"A_6": 1, "A_4": 1, "A_2": 1, "A_0": 1, "A_8": 1, "B_9": 1, "B_1": 1, "B_3": 1, "B_5": 1, "B_7": 1}},
+  {"stateful-identity-bolt": {"B_5": 1, "A_2": 1, "A_8": 1}},
+  {"stateful-identity-bolt": {"B_1": 1, "A_4": 1, "B_7": 1}},
+  {"stateful-identity-bolt": {"A_6": 1, "B_3": 1, "A_0": 1, "B_9": 1}}]
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json
new file mode 100644
index 0000000..691ea87
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json
@@ -0,0 +1,65 @@
+{
+  "topology": {
+    "id": "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+    "name": "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+    "spouts": {
+      "comp": {
+        "name": "stateful-ab-spout",
+        "config": {
+          "kvs": {
+            "key": "topology.component.parallelism",
+            "value": "1"
+          }
+        }
+      }
+    },
+    "bolts": [
+      {
+        "comp": {
+          "name": "stateful-identity-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "3"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "stateful-ab-spout"
+            }
+          }
+        ]
+      }
+    ],
+    "state": "RUNNING"
+  },
+  "instances": [
+    {
+      "instance_id": "container_1_stateful-ab-spout_1",
+      "info": {
+        "component_name": "stateful-ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_1_stateful-identity-bolt_2",
+      "info": {
+        "component_name": "stateful-identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_stateful-identity-bolt_3",
+      "info": {
+        "component_name": "stateful-identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_stateful-identity-bolt_4",
+      "info": {
+        "component_name": "stateful-identity-bolt"
+      }
+    }
+  ]
+}
diff --git a/integration_test/src/python/http_server/main.py b/integration_test/src/python/http_server/main.py
index 262de3d..72dc003 100644
--- a/integration_test/src/python/http_server/main.py
+++ b/integration_test/src/python/http_server/main.py
@@ -71,6 +71,32 @@ class MemoryMapHandler(tornado.web.RequestHandler):
     self.state_map[key] = tornado.escape.json_encode(data)
     self.write("Results written to " + tornado.escape.json_encode(self.state_map) + " successfully")
 
+# for instance states in stateful processing
+class StateResultHandler(tornado.web.RequestHandler):
+  def initialize(self, result_map):
+    self.result_map = result_map
+
+  def get(self, key):
+    if key:
+      self.set_header("Content-Type", 'application/json; charset="utf-8"')
+      if key in self.result_map:
+        self.write(tornado.escape.json_encode(self.result_map[key]))
+      else:
+        raise tornado.web.HTTPError(status_code=404, log_message="Key %s not found" % key)
+    else:
+      self.write(tornado.escape.json_encode(self.result_map))
+
+  def post(self, key):
+    data = tornado.escape.json_decode(self.request.body)
+    if key:
+      if key in self.result_map:
+        self.result_map[key].append(data)
+      else:
+        self.result_map[key] = [data]
+      self.write("Results written successfully: topology " + key + ' instance ' + data.keys()[0])
+    else:
+      raise tornado.web.HTTPError(status_code=404, log_message="Invalid key %s" % key)
+
 def main():
   '''
   Runs a tornado http server that listens for any
@@ -83,11 +109,14 @@ def main():
     os.makedirs(RESULTS_DIRECTORY)
 
   state_map = {}
+  # for instance states in stateful processing
+  state_result_map = {}
   application = tornado.web.Application([
       (r"/", MainHandler),
       (r"^/results/([a-zA-Z0-9_-]+$)", FileHandler),
       (r"^/state", MemoryMapGetAllHandler, dict(state_map=state_map)),
       (r"^/state/([a-zA-Z0-9_-]+$)", MemoryMapHandler, dict(state_map=state_map)),
+      (r"^/stateResults/([a-zA-Z0-9_-]+$)", StateResultHandler, dict(result_map=state_result_map)),
   ])
 
   if len(sys.argv) == 1:
diff --git a/integration_test/src/python/topology_test_runner/main.py b/integration_test/src/python/topology_test_runner/main.py
index ec948cc..1ed4cf8 100644
--- a/integration_test/src/python/topology_test_runner/main.py
+++ b/integration_test/src/python/topology_test_runner/main.py
@@ -8,6 +8,7 @@ import re
 import sys
 import time
 import uuid
+from httplib import HTTPConnection
 
 from ..common import status
 from heron.common.src.python.utils import log
@@ -160,16 +161,48 @@ class InstanceStateResultChecker(TopologyStructureResultChecker):
       raise status.TestFailure("The actual topology graph structure does not match the expected one"
                                + " for topology: %s" % self.topology_name)
     # check instance states, get the instance_state_check_result
-    # if both above are isinstanc(status.TestSuccess), return success, else return fail
+    # if both above are isinstance(status.TestSuccess), return success, else return fail
+    expected_result = self.instance_state_expected_result_handler.fetch_results()
 
-  def _compare_state(self, expected_results, actual_results):
-    pass
+    decoder = json.JSONDecoder(strict=False)
+    expected_result = decoder.decode(expected_result)
+    
+    actual_result =[]
+    for _ in range(0, RETRY_ATTEMPTS):
+      actual_result = self.instance_state_actual_result_handler.fetch_results()
+      actual_result = decoder.decode(actual_result)
+      if len(actual_result) == len(expected_result):
+        break
+      else:
+        time.sleep(RETRY_INTERVAL)
+    else:
+      raise status.TestFailure("Fail to get actual results of instance states for topology %s"
+                               % self.topology_name)
 
-  def _parse_state_expected_results(self, expected_results):
-    pass
+    if '_' not in expected_result[0].keys()[0]:
+      actual_result = self._parse_instance_id(actual_result)
 
-  def _parse_state_actual_results(self, actual_results):
-    pass
+    return self._compare_state(sorted(expected_result), sorted(actual_result))
+
+  def _compare_state(self, expected_results, actual_results):
+    if actual_results == expected_results:
+      return status.TestSuccess("Topology %s instance state result matches expected result"
+                                % self.topology_name)
+    else:
+      failure = status.TestFailure("Actual result did not match expected result")
+      # lambda required below to remove the unicode 'u' from the output
+      logging.info("Actual result ---------- \n" + str(map(lambda x: str(x), actual_results)))
+      logging.info("Expected result ---------- \n" + str(map(lambda x: str(x), expected_results)))
+      raise failure
+
+  def _parse_instance_id(self, input):
+    # remove taskId in instaneId
+    output = list()
+    for ele in input:
+      for key in ele:
+        new_key = key.split('_')[0]
+        output.append({new_key: dict(ele[key])})
+    return output
 
 
 class FileBasedExpectedResultsHandler(object):
@@ -247,14 +280,44 @@ class HttpBasedActualResultsHandler(object):
   Get actually loaded instance states
   TODO(yaoli): complete this class when stateful processing is ready
   """
-  pass
+  def __init__(self, server_host_port, topology_name):
+    self.server_host_port = server_host_port
+    self.topology_name = topology_name
 
-class StatefulStorageBasedExpectedResultsHandler(object):
-  """
-  Get expected instance states from checkpoint storage
-  TODO(yaoli): complete this class when stateful processing is ready
-  """
-  pass
+  def fetch_results(self):
+    try:
+      return self.fetch_from_server(self.server_host_port, self.topology_name,
+        'instance_state', '/stateResults/%s' % self.topology_name)
+    except Exception as e:
+      raise status.TestFailure("Fetching instance state failed for %s topology" % self.topology_name, e)
+
+  def fetch_from_server(self, server_host_port, topology_name, data_name, path):
+    ''' Make a http get request to fetch actual results from http server '''
+    for i in range(0, RETRY_ATTEMPTS):
+      logging.info("Fetching %s for topology %s, retry count: %d", data_name, topology_name, i)
+      response = self.get_http_response(server_host_port, path)
+      if response.status == 200:
+        return response.read()
+      elif i != RETRY_ATTEMPTS:
+        logging.info("Fetching %s failed with status: %s; reason: %s; body: %s",
+          data_name, response.status, response.reason, response.read())
+        time.sleep(RETRY_INTERVAL)
+
+    raise status.TestFailure("Failed to fetch %s after %d attempts" % (data_name, RETRY_ATTEMPTS))
+
+  def get_http_response(self, server_host_port, path):
+    ''' get HTTP response '''
+    for _ in range(0, RETRY_ATTEMPTS):
+      try:
+        connection = HTTPConnection(server_host_port)
+        connection.request('GET', path)
+        response = connection.getresponse()
+        return response
+      except Exception:
+        time.sleep(RETRY_INTERVAL)
+        continue
+
+    raise status.TestFailure("Failed to get HTTP Response after %d attempts" % RETRY_ATTEMPTS)
 
 #  Result handlers end
 
@@ -273,10 +336,14 @@ def filter_test_topologies(test_topologies, test_pattern):
 
 
 def run_topology_test(topology_name, classpath, results_checker,
-  params, update_args, deactivate_args, restart_args, extra_topology_args):
+  params, update_args, deactivate_args, restart_args, http_server_host_port, extra_topology_args,
+  check_type):
   try:
-    args = "-t %s %s" % \
-           (topology_name, extra_topology_args)
+    if check_type == 'checkpoint_state':
+      args = "-r http://%s/stateResults -t %s %s" % \
+             (http_server_host_port, topology_name, extra_topology_args)
+    else:
+      args = "-t %s %s" % (topology_name, extra_topology_args)
     submit_topology(params.heron_cli_path, params.cli_config_path, params.cluster, params.role,
       params.env, params.tests_bin_path, classpath,
       params.release_package_uri, args)
@@ -413,7 +480,7 @@ def run_topology_tests(conf, args):
   failures = []
   timestamp = time.strftime('%Y%m%d%H%M%S')
 
-  # http_host_port = "%s:%d" % (args.http_hostname, args.http_port)
+  http_server_host_port = "%s:%d" % (args.http_hostname, args.http_port)
 
   if args.tests_bin_path.endswith("scala-integration-tests.jar"):
     test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern)
@@ -437,7 +504,7 @@ def run_topology_tests(conf, args):
     update_args = ""
     deactivate_args = ""
     restart_args = ""
-    topology_args = ''
+    topology_args = ""
     if "updateArgs" in topology_conf:
       update_args = topology_conf["updateArgs"]
     if "deactivateArgs" in topology_conf:
@@ -448,22 +515,26 @@ def run_topology_tests(conf, args):
     if "topologyArgs" in topology_conf:
       topology_args = "%s %s" % (topology_args, topology_conf["topologyArgs"])
 
-    expected_result_file_path = \
-      args.topologies_path + "/" + topology_conf["expectedResultRelativePath"]
+    expected_topo_result_file_path = \
+      args.topologies_path + "/" + topology_conf["expectedTopoResultRelativePath"]
+    if "expectedStateResultRelativePath" in topology_conf:
+      expected_state_result_file_path = \
+        args.topologies_path + "/" + topology_conf["expectedStateResultRelativePath"]
+
     check_type = topology_conf["checkType"]
     if check_type == 'topology_structure':
       results_checker = load_result_checker(
         check_type, topology_name,
-        FileBasedExpectedResultsHandler(expected_result_file_path),
+        FileBasedExpectedResultsHandler(expected_topo_result_file_path),
         ZkFileBasedActualResultsHandler(topology_name, args.cluster))
     elif check_type == 'checkpoint_state':
       if processing_type == 'stateful':
         results_checker = load_result_checker(
           check_type, topology_name,
-          FileBasedExpectedResultsHandler(expected_result_file_path),
+          FileBasedExpectedResultsHandler(expected_topo_result_file_path),
           ZkFileBasedActualResultsHandler(topology_name, args.cluster),
-          StatefulStorageBasedExpectedResultsHandler(),
-          HttpBasedActualResultsHandler())
+          FileBasedExpectedResultsHandler(expected_state_result_file_path),
+          HttpBasedActualResultsHandler(http_server_host_port, topology_name))
       elif processing_type == 'non_stateful':
         raise ValueError("Cannot check instance checkpoint state in non_stateful processing. "
                        + "Not running topology: " + topology_name)
@@ -477,7 +548,8 @@ def run_topology_tests(conf, args):
     start_secs = int(time.time())
     try:
       result = run_topology_test(topology_name, classpath, results_checker,
-        args, update_args, deactivate_args, restart_args, topology_args)
+        args, update_args, deactivate_args, restart_args, http_server_host_port, topology_args,
+        check_type)
       test_tuple = (topology_name, int(time.time()) - start_secs)
       if isinstance(result, status.TestSuccess):
         successes += [test_tuple]
diff --git a/integration_test/src/python/topology_test_runner/resources/test.json b/integration_test/src/python/topology_test_runner/resources/test.json
index 0119a48..3682aa1 100644
--- a/integration_test/src/python/topology_test_runner/resources/test.json
+++ b/integration_test/src/python/topology_test_runner/resources/test.json
@@ -4,49 +4,56 @@
   "role" : "dummy_role",
   "env" : "dummy_env",
   "cliConfigPath" : "$HOME/.heron/conf",
-  "processingType" : "non_stateful",
+  "processingType" : "stateful",
   "topologyClasspathPrefix" : "org.apache.heron.integration_topology_test.topology.",
   "javaTopologies": [
     {
       "topologyName" : "IntegrationTopologyTest_FieldsGrouping",
       "classPath"    : "fields_grouping.FieldsGrouping",
-      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUp",
       "classPath"    : "basic_topology_one_task_scale_up.BasicTopologyOneTask",
       "updateArgs"   : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:3",
-      "expectedResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
+      "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleDown",
       "classPath"    : "basic_topology_one_task_scale_down.BasicTopologyOneTask",
       "updateArgs"   : "--component-parallelism=ab-spout:1 --component-parallelism=identity-bolt:2",
-      "expectedResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
+      "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUpDown",
       "classPath"    : "basic_topology_one_task_scale_up_down.BasicTopologyOneTask",
       "updateArgs"   : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:2",
-      "expectedResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
+      "expectedTopoResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_FieldsGroupingDeactivate",
       "classPath"    : "fields_grouping.FieldsGrouping",
       "deactivateArgs"  : "True",
-      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
       "checkType" : "topology_structure"
     },
     {
       "topologyName" : "IntegrationTopologyTest_FieldsGroupingKillContainer",
       "classPath"    : "fields_grouping.FieldsGrouping",
       "restartArgs"  : "True",
-      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "expectedTopoResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
       "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_StatefulBasicTopologyOneTask",
+      "classPath"    : "stateful_basic_topology_one_task.StatefulBasicTopologyOneTask",
+      "expectedTopoResultRelativePath" : "stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskTopo.json",
+      "expectedStateResultRelativePath" : "stateful_basic_topology_one_task/StatefulBasicTopologyOneTaskState.json",
+      "checkType" : "checkpoint_state"
     }
   ]
 }
diff --git a/scripts/run_integration_topology_test.sh b/scripts/run_integration_topology_test.sh
index cfa81b3..b56eda5 100644
--- a/scripts/run_integration_topology_test.sh
+++ b/scripts/run_integration_topology_test.sh
@@ -3,6 +3,7 @@
 # Script to locally run the integration topology test.
 #
 
+HTTP_SERVER="./bazel-bin/integration_test/src/python/http_server/http-server"
 TEST_RUNNER="./bazel-bin/integration_test/src/python/topology_test_runner/topology-test-runner.pex"
 
 JAVA_TESTS_DIR="integration_test/src/java/org/apache/heron/integration_topology_test/topology"
@@ -25,9 +26,9 @@ bazel run --config=`platform` -- scripts/packages:heron-install.sh --user
 bazel build --config=`platform` {heron/...,scripts/packages:tarpkgs,integration_test/src/...}
 
 # run the simple http server
-#${HTTP_SERVER} 8080 &
-#http_server_id=$!
-#trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+${HTTP_SERVER} 8080 &
+http_server_id=$!
+trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
 
 # run the scala integration tests
 #${TEST_RUNNER} \