You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/06/27 17:19:49 UTC

[GitHub] nlu90 closed pull request #2938: add integration topology test

nlu90 closed pull request #2938: add integration topology test
URL: https://github.com/apache/incubator-heron/pull/2938
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/integration_test/src/java/BUILD b/integration_test/src/java/BUILD
index 3792315f0e..b2a7c8f83a 100644
--- a/integration_test/src/java/BUILD
+++ b/integration_test/src/java/BUILD
@@ -4,7 +4,12 @@ package(default_visibility = ["//visibility:public"])
 
 filegroup(
     name = "test-data-files",
-    srcs = glob(["**/*.json"]),
+    srcs = glob(["org/apache/heron/integration_test/topology/**/*.json"]),
+)
+
+filegroup(
+    name = "topology-test-data-files",
+    srcs = glob(["org/apache/heron/integration_topology_test/topology/**/*.json"]),
 )
 
 java_library(
@@ -114,3 +119,61 @@ genrule(
     outs = ["local-integration-tests.jar"],
     cmd = "cp $< $@",
 )
+
+
+java_library(
+    name = "common_topology_test",
+    srcs = glob(
+        ["org/apache/heron/integration_topology_test/common/**/*.java"],
+    ),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
+        "//storm-compatibility/src/java:storm-compatibility-java",
+        "//third_party/java:hadoop-core",
+        "//third_party/java:jackson",
+        "@commons_cli_commons_cli//jar",
+        ":core"
+    ],
+)
+
+java_library(
+    name = "integration-topology-tests-lib",
+    srcs = glob(
+       ["org/apache/heron/integration_topology_test/topology/**/*.java"],
+    ),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
+        "//storm-compatibility/src/java:storm-compatibility-java",
+        "@com_googlecode_json_simple_json_simple//jar",
+        "@commons_cli_commons_cli//jar",
+        ":common_topology_test",
+        ":common",
+        ":core"
+    ],
+)
+
+java_binary(
+    name = "integration-topology-tests-unshaded",
+    srcs = glob(
+       ["org/apache/heron/integration_topology_test/topology/**/*.java"],
+    ),
+    deps = [
+        "//heron/api/src/java:api-java",
+        "//heron/api/src/java:api-java-low-level",
+        "//storm-compatibility/src/java:storm-compatibility-java",
+        "@commons_cli_commons_cli//jar",
+        "@com_googlecode_json_simple_json_simple//jar",
+        ":common_topology_test",
+        ":common",
+        ":core"
+    ],
+)
+
+genrule(
+    name = "integration-topology-tests",
+    srcs = [":integration-topology-tests-unshaded_deploy.jar"],
+    outs = ["integration-topology-tests.jar"],
+    cmd = "cp $< $@",
+)
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
new file mode 100644
index 0000000000..97ce86ea25
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/AbstractTestTopology.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common;
+
+import java.net.MalformedURLException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.heron.api.Config;
+import org.apache.heron.api.HeronSubmitter;
+import org.apache.heron.api.exception.AlreadyAliveException;
+import org.apache.heron.api.exception.InvalidTopologyException;
+import org.apache.heron.api.topology.TopologyBuilder;
+
+/**
+ * Class to abstract out the common parts of the test framework for submitting topologies.
+ * Subclasses can implement {@code buildTopology} and call {@code submit}.
+ */
+public abstract class AbstractTestTopology {
+  private static final String TOPOLOGY_OPTION = "topology_name";
+  private static final String STATE_UPDATE_TOKEN = "state_server_update_token";
+
+  private final CommandLine cmd;
+  private final String topologyName;
+
+  protected AbstractTestTopology(String[] args) throws MalformedURLException {
+    CommandLineParser parser = new DefaultParser();
+    HelpFormatter formatter = new HelpFormatter();
+    Options options = getArgOptions();
+
+    try {
+      cmd = parser.parse(options, args);
+    } catch (ParseException e) {
+      formatter.setWidth(120);
+      formatter.printHelp("java " + getClass().getCanonicalName(), options, true);
+      throw new RuntimeException(e);
+    }
+
+    this.topologyName = cmd.getOptionValue(TOPOLOGY_OPTION);
+  }
+
+  protected abstract TopologyBuilder buildTopology(TopologyBuilder builder);
+
+  protected Config buildConfig(Config config) {
+    return config;
+  }
+
+  protected Options getArgOptions() {
+    Options options = new Options();
+
+    Option topologyNameOption = new Option("t", TOPOLOGY_OPTION, true, "topology name");
+    topologyNameOption.setRequired(true);
+    options.addOption(topologyNameOption);
+
+    return options;
+  }
+
+  public final void submit() throws AlreadyAliveException, InvalidTopologyException {
+    this.submit(null);
+  }
+
+  public final void submit(Config userConf) throws AlreadyAliveException, InvalidTopologyException {
+    TopologyBuilder builder = new TopologyBuilder();
+
+    Config conf = buildConfig(new BasicConfig());
+    if (userConf != null) {
+      conf.putAll(userConf);
+    }
+    HeronSubmitter.submitTopology(topologyName, conf, buildTopology(builder).createTopology());
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/common/BasicConfig.java b/integration_test/src/java/org/apache/heron/integration_topology_test/common/BasicConfig.java
new file mode 100644
index 0000000000..739ef1666a
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/common/BasicConfig.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.common;
+
+import org.apache.heron.api.Config;
+
+/**
+ * A basic configuration for heron topology
+ */
+public class BasicConfig extends Config {
+  private static final long serialVersionUID = -2743309166660326071L;
+  private static final int DEFAULT_NUM_STMGRS = 1;
+
+
+  public BasicConfig() {
+    this(true, DEFAULT_NUM_STMGRS);
+  }
+
+  public BasicConfig(boolean isDebug, int numStmgrs) {
+    super();
+    super.setTeamEmail("streaming-compute@twitter.com");
+    super.setTeamName("stream-computing");
+    super.setTopologyProjectName("heron-integration-test");
+    super.setDebug(isDebug);
+    super.setNumStmgrs(numStmgrs);
+    super.setTopologyReliabilityMode(Config.TopologyReliabilityMode.ATLEAST_ONCE);
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_down/BasicTopologyOneTask.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_down/BasicTopologyOneTask.java
new file mode 100644
index 0000000000..a797ae43dd
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_down/BasicTopologyOneTask.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.topology.basic_topology_one_task_scale_down;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_test.common.bolt.IdentityBolt;
+import org.apache.heron.integration_test.common.spout.ABSpout;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+
+/**
+ * Topology to test basic structure, single thread spout &amp; bolt, shuffleGrouping
+ */
+public final class BasicTopologyOneTask extends AbstractTestTopology {
+
+  private BasicTopologyOneTask(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+    builder.setSpout("ab-spout", new ABSpout(true), 2);
+    builder.setBolt("identity-bolt", new IdentityBolt(new Fields("word")), 3)
+        .shuffleGrouping("ab-spout");
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    BasicTopologyOneTask topology = new BasicTopologyOneTask(args);
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json
new file mode 100644
index 0000000000..48b8c49d7a
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json
@@ -0,0 +1,59 @@
+{
+  "topology": {
+    "id": "IntegrationTopologyTest_BasicTopologyOneTaskScaleDown",
+    "name": "IntegrationTopologyTest_BasicTopologyOneTaskScaleDown",
+    "spouts": {
+      "comp": {
+        "name": "ab-spout",
+        "config": {
+          "kvs": {
+            "key": "topology.component.parallelism",
+            "value": "1"
+          }
+        }
+      }
+    },
+    "bolts": [
+      {
+        "comp": {
+          "name": "identity-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "2"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "ab-spout"
+            }
+          }
+        ]
+      }
+    ],
+    "state": "RUNNING"
+  },
+  "instances": [
+    {
+      "instance_id": "container_1_ab-spout_1",
+      "info": {
+        "component_name": "ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_1_identity-bolt_2",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_1_identity-bolt_3",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    }
+  ]
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up/BasicTopologyOneTask.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up/BasicTopologyOneTask.java
new file mode 100644
index 0000000000..82326d1196
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up/BasicTopologyOneTask.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.topology.basic_topology_one_task_scale_up;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_test.common.bolt.IdentityBolt;
+import org.apache.heron.integration_test.common.spout.ABSpout;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+
+/**
+ * Topology to test basic structure, single thread spout &amp; bolt, shuffleGrouping
+ */
+public final class BasicTopologyOneTask extends AbstractTestTopology {
+
+  private BasicTopologyOneTask(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+    builder.setSpout("ab-spout", new ABSpout(true), 1);
+    builder.setBolt("identity-bolt", new IdentityBolt(new Fields("word")), 2)
+        .shuffleGrouping("ab-spout");
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    BasicTopologyOneTask topology = new BasicTopologyOneTask(args);
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json
new file mode 100644
index 0000000000..ed0c9d5436
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json
@@ -0,0 +1,71 @@
+{
+  "topology": {
+    "id": "IntegrationTopologyTest_BasicTopologyOneTaskScaleUp",
+    "name": "IntegrationTopologyTest_BasicTopologyOneTaskScaleUp",
+    "spouts": {
+      "comp": {
+        "name": "ab-spout",
+        "config": {
+          "kvs": {
+            "key": "topology.component.parallelism",
+            "value": "2"
+          }
+        }
+      }
+    },
+    "bolts": [
+      {
+        "comp": {
+          "name": "identity-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "3"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "ab-spout"
+            }
+          }
+        ]
+      }
+    ],
+    "state": "RUNNING"
+  },
+  "instances": [
+    {
+      "instance_id": "container_1_ab-spout_1",
+      "info": {
+        "component_name": "ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_1_identity-bolt_3",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_1_identity-bolt_5",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_ab-spout_2",
+      "info": {
+        "component_name": "ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_2_identity-bolt_4",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    }
+  ]
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up_down/BasicTopologyOneTask.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up_down/BasicTopologyOneTask.java
new file mode 100644
index 0000000000..1fee7a823c
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up_down/BasicTopologyOneTask.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.topology.basic_topology_one_task_scale_up_down;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_test.common.bolt.IdentityBolt;
+import org.apache.heron.integration_test.common.spout.ABSpout;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+
+/**
+ * Topology to test basic structure, single thread spout &amp; bolt, shuffleGrouping
+ */
+public final class BasicTopologyOneTask extends AbstractTestTopology {
+
+  private BasicTopologyOneTask(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+    builder.setSpout("ab-spout", new ABSpout(true), 1);
+    builder.setBolt("identity-bolt", new IdentityBolt(new Fields("word")), 3)
+        .shuffleGrouping("ab-spout");
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    BasicTopologyOneTask topology = new BasicTopologyOneTask(args);
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json
new file mode 100644
index 0000000000..135decd6c4
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json
@@ -0,0 +1,65 @@
+{
+  "topology": {
+    "id": "IntegrationTopologyTest_BasicTopologyOneTaskScaleUpDown",
+    "name": "IntegrationTopologyTest_BasicTopologyOneTaskScaleUpDown",
+    "spouts": {
+      "comp": {
+        "name": "ab-spout",
+        "config": {
+          "kvs": {
+            "key": "topology.component.parallelism",
+            "value": "2"
+          }
+        }
+      }
+    },
+    "bolts": [
+      {
+        "comp": {
+          "name": "identity-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "2"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "ab-spout"
+            }
+          }
+        ]
+      }
+    ],
+    "state": "RUNNING"
+  },
+  "instances": [
+    {
+      "instance_id": "container_1_ab-spout_1",
+      "info": {
+        "component_name": "ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_1_identity-bolt_3",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_ab-spout_2",
+      "info": {
+        "component_name": "ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_2_identity-bolt_4",
+      "info": {
+        "component_name": "identity-bolt"
+      }
+    }
+  ]
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/fields_grouping/FieldsGrouping.java b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/fields_grouping/FieldsGrouping.java
new file mode 100644
index 0000000000..2615cc1a98
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/fields_grouping/FieldsGrouping.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.integration_topology_test.topology.fields_grouping;
+
+import java.net.MalformedURLException;
+
+import org.apache.heron.api.topology.TopologyBuilder;
+import org.apache.heron.api.tuple.Fields;
+import org.apache.heron.integration_test.common.bolt.CountAggregatorBolt;
+import org.apache.heron.integration_test.common.bolt.WordCountBolt;
+import org.apache.heron.integration_test.common.spout.ABSpout;
+import org.apache.heron.integration_topology_test.common.AbstractTestTopology;
+
+/**
+ * Topology to test fields grouping
+ */
+public final class FieldsGrouping extends AbstractTestTopology {
+
+  private FieldsGrouping(String[] args) throws MalformedURLException {
+    super(args);
+  }
+
+  @Override
+  protected TopologyBuilder buildTopology(TopologyBuilder builder) {
+    builder.setSpout("ab-spout", new ABSpout(), 1);
+    builder.setBolt("count-bolt", new WordCountBolt(), 2)
+        .fieldsGrouping("ab-spout", new Fields("word"));
+    builder.setBolt("sum-bolt", new CountAggregatorBolt(), 1)
+        .noneGrouping("count-bolt");
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    FieldsGrouping topology = new FieldsGrouping(args);
+    topology.submit();
+  }
+}
diff --git a/integration_test/src/java/org/apache/heron/integration_topology_test/topology/fields_grouping/FieldsGroupingResults.json b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/fields_grouping/FieldsGroupingResults.json
new file mode 100644
index 0000000000..14f68e9ca8
--- /dev/null
+++ b/integration_test/src/java/org/apache/heron/integration_topology_test/topology/fields_grouping/FieldsGroupingResults.json
@@ -0,0 +1,84 @@
+{
+  "topology": {
+    "id": "IntegrationTopologyTest_FieldsGrouping",
+    "name": "IntegrationTopologyTest_FieldsGrouping",
+    "spouts": {
+      "comp": {
+        "name": "ab-spout",
+        "config": {
+          "kvs": {
+            "key": "topology.component.parallelism",
+            "value": "1"
+          }
+        }
+      }
+    },
+    "bolts": [
+      {
+        "comp": {
+          "name": "count-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "2"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "ab-spout"
+            }
+          }
+        ]
+      },
+      {
+        "comp": {
+          "name": "sum-bolt",
+          "config": {
+            "kvs": {
+              "key": "topology.component.parallelism",
+              "value": "1"
+            }
+          }
+        },
+        "inputs": [
+          {
+            "stream": {
+              "id": "default",
+              "component_name": "count-bolt"
+            }
+          }
+        ]
+      }
+    ],
+    "state": "RUNNING"
+  },
+  "instances": [
+    {
+      "instance_id": "container_1_ab-spout_1",
+      "info": {
+        "component_name": "ab-spout"
+      }
+    },
+    {
+      "instance_id": "container_1_count-bolt_3",
+      "info": {
+        "component_name": "count-bolt"
+      }
+    },
+    {
+      "instance_id": "container_1_count-bolt_4",
+      "info": {
+        "component_name": "count-bolt"
+      }
+    },
+    {
+      "instance_id": "container_2_sum-bolt_6",
+      "info": {
+        "component_name": "sum-bolt"
+      }
+    }
+  ]
+}
diff --git a/integration_test/src/python/topology_test_runner/BUILD b/integration_test/src/python/topology_test_runner/BUILD
new file mode 100644
index 0000000000..20c412b944
--- /dev/null
+++ b/integration_test/src/python/topology_test_runner/BUILD
@@ -0,0 +1,19 @@
+package(default_visibility = ["//visibility:public"])
+
+pex_binary(
+    name = "topology-test-runner",
+    srcs = [
+        "main.py"
+    ],
+    main = "main.py",
+    resources = [
+        "resources/test.json",
+    ],
+    reqs = ["argparse==1.4.0"],
+    deps = [
+        "//heron/common/src/python:common-py",
+        "//heronpy/proto:proto-py",
+        "//integration_test/src/python/common",
+        "//heron/statemgrs/src/python:statemgr-py",
+    ],
+)
diff --git a/integration_test/src/python/topology_test_runner/main.py b/integration_test/src/python/topology_test_runner/main.py
new file mode 100644
index 0000000000..ec948cc7be
--- /dev/null
+++ b/integration_test/src/python/topology_test_runner/main.py
@@ -0,0 +1,568 @@
+''' main '''
+import argparse
+import json
+import logging
+import os
+import pkgutil
+import re
+import sys
+import time
+import uuid
+
+from ..common import status
+from heron.common.src.python.utils import log
+from heron.statemgrs.src.python import configloader
+from heron.statemgrs.src.python.zkstatemanager import ZkStateManager
+from heron.statemgrs.src.python.filestatemanager import FileStateManager
+
+# The location of default configure file
+DEFAULT_TEST_CONF_FILE = "integration_test/src/python/topology_test_runner/resources/test.json"
+
+#seconds
+RETRY_ATTEMPTS = 15
+RETRY_INTERVAL = 10
+WAIT_FOR_DEACTIVATION = 5
+
+
+class TopologyStructureResultChecker(object):
+  """
+  Validate topology graph structure
+  """
+  def __init__(self, topology_name,
+    topology_structure_expected_results_handler,
+    topology_structure_actual_results_handler):
+    self.topology_name = topology_name
+    self.topology_structure_expected_results_handler = topology_structure_expected_results_handler
+    self.topology_structure_actual_results_handler = topology_structure_actual_results_handler
+
+  def check_results(self):
+    """
+    Checks the topology graph structure from zk with the expected results from local file
+    """
+    expected_result = self.topology_structure_expected_results_handler.fetch_results()
+    actual_result = self.topology_structure_actual_results_handler.fetch_cur_pplan()
+
+    self.topology_structure_actual_results_handler.stop_state_mgr()
+
+    decoder = json.JSONDecoder(strict=False)
+    expected_results = decoder.decode(expected_result)
+
+    return self._compare(expected_results, actual_result)
+
+  def _compare(self, expected_results, actual_results):
+    """
+    check if the topology structure is correct
+    """
+    expected_nodes, expected_links = self._parse_expected_results(expected_results)
+    actual_nodes, actual_links = self._parse_actual_results(actual_results)
+    correct_topology = True
+    if correct_topology:
+      for key_expected in expected_nodes:
+        if key_expected not in actual_nodes:
+          correct_topology = False
+          break
+        if expected_nodes[key_expected] != actual_nodes[key_expected]:
+          correct_topology = False
+          break
+    if correct_topology:
+      for key_actual in actual_nodes:
+        if key_actual not in expected_nodes:
+          correct_topology = False
+          break
+        if expected_nodes[key_actual] != actual_nodes[key_actual]:
+          correct_topology = False
+          break
+    if correct_topology:
+      for key_expected in expected_links:
+        if key_expected not in actual_links:
+          correct_topology = False
+          break
+        if expected_links[key_expected] != actual_links[key_expected]:
+          correct_topology = False
+          break
+    if correct_topology:
+      for key_actual in actual_links:
+        if key_actual not in expected_links:
+          correct_topology = False
+          break
+        if expected_links[key_actual] != actual_links[key_actual]:
+          correct_topology = False
+          break
+
+    if correct_topology:
+      return status.TestSuccess(
+        "Topology %s result matches expected result" % self.topology_name)
+    else:
+      raise status.TestFailure("Actual result did not match expected result")
+
+  def _parse_expected_results(self, expected_results):
+    """
+    Parse JSON file and generate expected_nodes and expected_links
+    """
+    expected_nodes = dict()
+    expected_links = dict()
+    for bolt in expected_results["topology"]["bolts"]:
+      name = bolt["comp"]["name"]
+      if name not in expected_links:
+        expected_links[name] = set()
+      for input in bolt["inputs"]:
+        expected_links[name].add(input["stream"]["component_name"])
+    for instance in expected_results["instances"]:
+      name = instance["info"]["component_name"]
+      if name not in expected_nodes:
+        expected_nodes[name] = 0
+      else:
+        expected_nodes[name] += 1
+
+    return  expected_nodes, expected_links
+
+  def _parse_actual_results(self, actual_results):
+    """
+    Parse protobuf messege and generate actual_nodes and actual_links
+    """
+    actual_nodes = dict()
+    actual_links = dict()
+    for bolt in actual_results.topology.bolts:
+      name = bolt.comp.name
+      if name not in actual_links:
+        actual_links[name] = set()
+      for input in bolt.inputs:
+        actual_links[name].add(input.stream.component_name)
+    for instance in actual_results.instances:
+      name = instance.info.component_name
+      if name not in actual_nodes:
+        actual_nodes[name] = 0
+      else:
+        actual_nodes[name] += 1
+
+    return actual_nodes, actual_links
+
+
+class InstanceStateResultChecker(TopologyStructureResultChecker):
+  """"
+  Validating instance states after checkpoint rollback
+  TODO(yaoli): complete this class when stateful processing is ready
+  """
+  def __init__(self, topology_name,
+    topology_structure_expected_results_handler,
+    topology_structure_actual_results_handler,
+    instance_state_expected_result_handler,
+    instance_state_actual_result_handler):
+    TopologyStructureResultChecker.__init__(self,topology_name,
+      topology_structure_expected_results_handler,
+      topology_structure_actual_results_handler)
+    self.instance_state_expected_result_handler = instance_state_expected_result_handler
+    self.instance_state_actual_result_handler = instance_state_actual_result_handler
+
+  def check_results(self):
+    topology_structure_check_result = TopologyStructureResultChecker.check_results(self)
+    if isinstance(topology_structure_check_result, status.TestFailure):
+      raise status.TestFailure("The actual topology graph structure does not match the expected one"
+                               + " for topology: %s" % self.topology_name)
+    # check instance states, get the instance_state_check_result
+    # if both above are isinstanc(status.TestSuccess), return success, else return fail
+
+  def _compare_state(self, expected_results, actual_results):
+    pass
+
+  def _parse_state_expected_results(self, expected_results):
+    pass
+
+  def _parse_state_actual_results(self, actual_results):
+    pass
+
+
+class FileBasedExpectedResultsHandler(object):
+  """
+  Get expected topology graph structure result from local file
+  """
+  def __init__(self, file_path):
+    self.file_path = file_path
+
+  def fetch_results(self):
+    """
+    Read expected result from the expected result file
+    """
+    try:
+      if not os.path.exists(self.file_path):
+        raise status.TestFailure("Expected results file %s does not exist" % self.file_path)
+      else:
+        with open(self.file_path, "r") as expected_result_file:
+          return expected_result_file.read().rstrip()
+    except Exception as e:
+      raise status.TestFailure("Failed to read expected result file %s" % self.file_path, e)
+
+
+class ZkFileBasedActualResultsHandler(object):
+  """
+  Get actual topology graph structure result from zk
+  """
+  def __init__(self, topology_name, cluster):
+    self.topology_name = topology_name
+    self.state_mgr = self._load_state_mgr(cluster)
+    self.state_mgr.start()
+
+  def _load_state_mgr(self, cluster):
+    state_mgr_config = configloader.load_state_manager_locations(cluster, os.getenv("HOME")
+                                                                 +'/.heron/conf/'+cluster
+                                                                 + '/statemgr.yaml')
+    if state_mgr_config[0]["type"] == 'file':
+      return FileStateManager(self.topology_name, os.getenv("HOME")
+                                                  +'/.herondata/repository/state/local')
+    elif state_mgr_config[0]["type"] == 'zookeeper':
+      host_port = state_mgr_config[0]["hostport"].split(':')
+      return ZkStateManager(state_mgr_config[0]["type"],
+        [(host_port[0], int(host_port[1]))],
+        state_mgr_config[0]["rootpath"],
+        state_mgr_config[0]["tunnelhost"])
+    else:
+      raise status.TestFailure("Unrecognized state manager type: %s"
+                               % state_mgr_config["type"])
+
+  def fetch_cur_pplan(self):
+    try:
+      for i in range(0, RETRY_ATTEMPTS):
+        logging.info("Fetching physical plan of topology %s, retry count: %d", self.topology_name, i)
+        try:
+          pplan_string = self.state_mgr.get_pplan(self.topology_name)
+        except IOError:
+          pplan_string = None
+        if pplan_string is not None and pplan_string.topology.state == 1: # RUNNING = 1
+          break
+        time.sleep(RETRY_INTERVAL)
+      else:
+        raise status.TestFailure("Fetching physical plan failed for %s topology"
+                                 % self.topology_name)
+      return pplan_string
+    except Exception as e:
+      raise status.TestFailure("Fetching physical plan failed for %s topology"
+                               % self.topology_name, e)
+
+  def stop_state_mgr(self):
+    self.state_mgr.stop()
+
+
+class HttpBasedActualResultsHandler(object):
+  """
+  Get actually loaded instance states
+  TODO(yaoli): complete this class when stateful processing is ready
+  """
+  pass
+
+class StatefulStorageBasedExpectedResultsHandler(object):
+  """
+  Get expected instance states from checkpoint storage
+  TODO(yaoli): complete this class when stateful processing is ready
+  """
+  pass
+
+#  Result handlers end
+
+
+def filter_test_topologies(test_topologies, test_pattern):
+  initial_topologies = test_topologies
+  if test_pattern:
+    pattern = re.compile(test_pattern)
+    test_topologies = filter(lambda x: pattern.match(x['topologyName']), test_topologies)
+
+  if len(test_topologies) == 0:
+    logging.error("Test filter '%s' did not match any configured test names:\n%s",
+      test_pattern, '\n'.join(map(lambda x: x['topologyName'], initial_topologies)))
+    sys.exit(1)
+  return test_topologies
+
+
+def run_topology_test(topology_name, classpath, results_checker,
+  params, update_args, deactivate_args, restart_args, extra_topology_args):
+  try:
+    args = "-t %s %s" % \
+           (topology_name, extra_topology_args)
+    submit_topology(params.heron_cli_path, params.cli_config_path, params.cluster, params.role,
+      params.env, params.tests_bin_path, classpath,
+      params.release_package_uri, args)
+  except Exception as e:
+    raise status.TestFailure("Failed to submit %s topology" % topology_name, e)
+
+  logging.info("Successfully submitted %s topology", topology_name)
+
+  try:
+    if update_args:
+      # check if pplan is already available
+      results_checker.topology_structure_actual_results_handler.fetch_cur_pplan()
+      logging.info("Verified topology successfully started, proceeding to update it")
+      update_topology(params.heron_cli_path, params.cli_config_path, params.cluster,
+        params.role, params.env, topology_name, update_args)
+    elif deactivate_args:
+      results_checker.topology_structure_actual_results_handler.fetch_cur_pplan()
+      logging.info("Verified topology successfully started, proceeding "
+                    + "to deactivate and activate it")
+      deactivate_topology(params.heron_cli_path, params.cli_config_path, params.cluster,
+        params.role, params.env, topology_name, True)
+      time.sleep(WAIT_FOR_DEACTIVATION)
+      deactivate_topology(params.heron_cli_path, params.cli_config_path, params.cluster,
+        params.role, params.env, topology_name, False)
+    elif restart_args:
+      results_checker.topology_structure_actual_results_handler.fetch_cur_pplan()
+      logging.info("Verified topology successfully started, proceeding to kill a container")
+      restart_topology(params.heron_cli_path, params.cli_config_path, params.cluster,
+        params.role, params.env, topology_name, 1)
+
+    return results_checker.check_results()
+
+  except Exception as e:
+    raise status.TestFailure("Checking result failed for %s topology" % topology_name, e)
+  finally:
+    kill_topology(params.heron_cli_path, params.cli_config_path, params.cluster,
+      params.role, params.env, topology_name)
+    pass
+
+
+# Topology operations
+
+def submit_topology(heron_cli_path, cli_config_path, cluster, role,
+  env, jar_path, classpath, pkg_uri, args=None):
+  """
+  Submit topology using heron-cli
+  """
+  cmd = "%s submit --config-path=%s %s %s %s %s" % \
+        (heron_cli_path, cli_config_path, cluster_token(cluster, role, env),
+        jar_path, classpath, args)
+
+  if pkg_uri is not None:
+    cmd = "%s --config-property heron.package.core.uri='%s'" %(cmd, pkg_uri)
+
+  logging.info("Submitting topology: %s", cmd)
+
+  if os.system(cmd) != 0:
+    raise status.TestFailure("Unable to submit the topology")
+
+
+def update_topology(heron_cli_path, cli_config_path, cluster,
+  role, env, topology_name, update_args):
+  cmd = "%s update --config-path=%s %s %s %s --verbose" % \
+        (heron_cli_path, cli_config_path,
+        cluster_token(cluster, role, env), update_args, topology_name)
+
+  logging.info("Update topology: %s", cmd)
+  if os.system(cmd) != 0:
+    raise status.TestFailure("Failed to update topology %s" % topology_name)
+
+  logging.info("Successfully updated topology %s", topology_name)
+
+
+def deactivate_topology(heron_cli_path, cli_config_path, cluster,
+    role, env, topology_name, deactivate):
+  if deactivate:
+    cmd = "%s deactivate --config-path=%s %s %s" % \
+          (heron_cli_path, cli_config_path,
+          cluster_token(cluster, role, env), topology_name)
+    logging.info("deactivate topology: %s", cmd)
+    if os.system(cmd) != 0:
+      raise status.TestFailure("Failed to deactivate topology %s" % topology_name)
+    logging.info("Successfully deactivate topology %s", topology_name)
+  else:
+    cmd = "%s activate --config-path=%s %s %s" % \
+          (heron_cli_path, cli_config_path,
+          cluster_token(cluster, role, env), topology_name)
+    logging.info("activate topology: %s", cmd)
+    if os.system(cmd) != 0:
+      raise status.TestFailure("Failed to activate topology %s" % topology_name)
+    logging.info("Successfully activate topology %s", topology_name)
+
+
+def restart_topology(heron_cli_path, cli_config_path, cluster,
+    role, env, topology_name, container_id):
+  cmd = "%s restart --config-path=%s %s %s %s" % \
+        (heron_cli_path, cli_config_path,
+        cluster_token(cluster, role, env), topology_name, str(container_id))
+
+  logging.info("Kill container %s", cmd)
+  if os.system(cmd) != 0:
+    raise status.TestFailure("Failed to kill container %s" % str(container_id))
+
+  logging.info("Successfully kill container %s", str(container_id))
+
+
+def kill_topology(heron_cli_path, cli_config_path, cluster, role, env, topology_name):
+  """
+  Kill a topology using heron-cli
+  """
+  cmd = "%s kill --config-path=%s %s %s" % \
+        (heron_cli_path, cli_config_path, cluster_token(cluster, role, env), topology_name)
+
+  logging.info("Killing topology: %s", cmd)
+  if os.system(cmd) != 0:
+    raise status.TestFailure("Failed to kill topology %s" % topology_name)
+
+  logging.info("Successfully killed topology %s", topology_name)
+
+
+def cluster_token(cluster, role, env):
+  if cluster == "local" or cluster == "localzk":
+    return cluster
+  return "%s/%s/%s" % (cluster, role, env)
+
+#  Topology manipulations end
+
+
+def run_topology_tests(conf, args):
+  """
+  Run the test for each topology specified in the conf file
+  """
+  successes = []
+  failures = []
+  timestamp = time.strftime('%Y%m%d%H%M%S')
+
+  # http_host_port = "%s:%d" % (args.http_hostname, args.http_port)
+
+  if args.tests_bin_path.endswith("scala-integration-tests.jar"):
+    test_topologies = filter_test_topologies(conf["scalaTopologies"], args.test_topology_pattern)
+    topology_classpath_prefix = conf["topologyClasspathPrefix"]
+  elif args.tests_bin_path.endswith("integration-topology-tests.jar"):
+    test_topologies = filter_test_topologies(conf["javaTopologies"], args.test_topology_pattern)
+    topology_classpath_prefix = conf["topologyClasspathPrefix"]
+  elif args.tests_bin_path.endswith("heron_integ_topology.pex"):
+    test_topologies = filter_test_topologies(conf["pythonTopologies"], args.test_topology_pattern)
+    topology_classpath_prefix = ""
+  else:
+    raise ValueError("Unrecognized binary file type: %s" % args.tests_bin_path)
+
+  processing_type = conf["processingType"]
+
+  current = 1
+  for topology_conf in test_topologies:
+    topology_name = ("%s_%s_%s") % (timestamp, topology_conf["topologyName"], str(uuid.uuid4()))
+    classpath = topology_classpath_prefix + topology_conf["classPath"]
+
+    update_args = ""
+    deactivate_args = ""
+    restart_args = ""
+    topology_args = ''
+    if "updateArgs" in topology_conf:
+      update_args = topology_conf["updateArgs"]
+    if "deactivateArgs" in topology_conf:
+      deactivate_args = True
+    if "restartArgs" in topology_conf:
+      restart_args = True
+
+    if "topologyArgs" in topology_conf:
+      topology_args = "%s %s" % (topology_args, topology_conf["topologyArgs"])
+
+    expected_result_file_path = \
+      args.topologies_path + "/" + topology_conf["expectedResultRelativePath"]
+    check_type = topology_conf["checkType"]
+    if check_type == 'topology_structure':
+      results_checker = load_result_checker(
+        check_type, topology_name,
+        FileBasedExpectedResultsHandler(expected_result_file_path),
+        ZkFileBasedActualResultsHandler(topology_name, args.cluster))
+    elif check_type == 'checkpoint_state':
+      if processing_type == 'stateful':
+        results_checker = load_result_checker(
+          check_type, topology_name,
+          FileBasedExpectedResultsHandler(expected_result_file_path),
+          ZkFileBasedActualResultsHandler(topology_name, args.cluster),
+          StatefulStorageBasedExpectedResultsHandler(),
+          HttpBasedActualResultsHandler())
+      elif processing_type == 'non_stateful':
+        raise ValueError("Cannot check instance checkpoint state in non_stateful processing. "
+                       + "Not running topology: " + topology_name)
+      else:
+        raise ValueError("Unrecognized processing type for topology: " + topology_name)
+    else:
+      raise ValueError("Unrecognized check type for topology: " + topology_name)
+
+    logging.info("==== Starting test %s of %s: %s ====",
+      current, len(test_topologies), topology_name)
+    start_secs = int(time.time())
+    try:
+      result = run_topology_test(topology_name, classpath, results_checker,
+        args, update_args, deactivate_args, restart_args, topology_args)
+      test_tuple = (topology_name, int(time.time()) - start_secs)
+      if isinstance(result, status.TestSuccess):
+        successes += [test_tuple]
+      elif isinstance(result, status.TestFailure):
+        failures += [test_tuple]
+      else:
+        logging.error("Unrecognized test response returned for test %s: %s",
+          topology_name, str(result))
+        failures += [test_tuple]
+    except status.TestFailure:
+      test_tuple = (topology_name, int(time.time()) - start_secs)
+      failures += [test_tuple]
+
+    current += 1
+  return successes, failures
+
+
+def load_result_checker(check_type, topology_name,
+  expected_topology_structure_result_handler,
+  actual_topology_structure_result_handler,
+  expected_instance_state_result_handler = None,
+  actual_instance_state_result_handler = None):
+
+  if check_type == "topology_structure":
+    return TopologyStructureResultChecker(topology_name,
+      expected_topology_structure_result_handler,
+      actual_topology_structure_result_handler)
+  elif check_type == "checkpoint_state":
+    return InstanceStateResultChecker(topology_name,
+      expected_topology_structure_result_handler,
+      actual_topology_structure_result_handler,
+      expected_instance_state_result_handler,
+      actual_instance_state_result_handler)
+  else:
+    status.TestFailure("Unrecognized check type : %s", check_type)
+
+
+def main():
+  """
+  main
+  """
+  log.configure(level=logging.DEBUG)
+  conf_file = DEFAULT_TEST_CONF_FILE
+  # Read the configuration file from package
+  conf_string = pkgutil.get_data(__name__, conf_file)
+  decoder = json.JSONDecoder(strict=False)
+  # Convert the conf file to a json format
+  conf = decoder.decode(conf_string)
+
+  # Parse the arguments passed via command line
+  parser = argparse.ArgumentParser(description='This is the heron integration test framework')
+
+  parser.add_argument('-hc', '--heron-cli-path', dest='heron_cli_path',
+    default=conf['heronCliPath'])
+  parser.add_argument('-tb', '--tests-bin-path', dest='tests_bin_path')
+  parser.add_argument('-cl', '--cluster', dest='cluster', default=conf['cluster'])
+  parser.add_argument('-ev', '--env', dest='env', default=conf['env'])
+  parser.add_argument('-rl', '--role', dest='role', default=conf['role'])
+  parser.add_argument('-rh', '--http-hostname', dest='http_hostname', default='localhost')
+  parser.add_argument('-rp', '--http-port', dest='http_port', type=int,
+    default='8080')
+  parser.add_argument('-tp', '--topologies-path', dest='topologies_path')
+  parser.add_argument('-ts', '--test-topology-pattern', dest='test_topology_pattern', default=None)
+  parser.add_argument('-pi', '--release-package-uri', dest='release_package_uri', default=None)
+  parser.add_argument('-cd', '--cli-config-path', dest='cli_config_path',
+    default=conf['cliConfigPath'])
+
+  args, unknown_args = parser.parse_known_args()
+  if unknown_args:
+    logging.error('Unknown argument passed to %s: %s', sys.argv[0], unknown_args[0])
+    sys.exit(1)
+
+  (successes, failures) = run_topology_tests(conf, args)
+  total = len(failures) + len(successes)
+
+  if not failures:
+    logging.info("SUCCESS: %s (all) tests passed:", len(successes))
+    for test in successes:
+      logging.info("  - %s: %s", ("[%ss]" % test[1]).ljust(8), test[0])
+    sys.exit(0)
+  else:
+    logging.error("FAILURE: %s/%s tests failed:", len(failures), total)
+    for test in failures:
+      logging.error("  - %s: %s", ("[%ss]" % test[1]).ljust(8), test[0])
+    sys.exit(1)
+
+if __name__ == '__main__':
+  main()
diff --git a/integration_test/src/python/topology_test_runner/resources/test.json b/integration_test/src/python/topology_test_runner/resources/test.json
new file mode 100644
index 0000000000..0119a48111
--- /dev/null
+++ b/integration_test/src/python/topology_test_runner/resources/test.json
@@ -0,0 +1,52 @@
+{
+  "heronCliPath" : "dummy_cli",
+  "cluster" : "dummy_cluster",
+  "role" : "dummy_role",
+  "env" : "dummy_env",
+  "cliConfigPath" : "$HOME/.heron/conf",
+  "processingType" : "non_stateful",
+  "topologyClasspathPrefix" : "org.apache.heron.integration_topology_test.topology.",
+  "javaTopologies": [
+    {
+      "topologyName" : "IntegrationTopologyTest_FieldsGrouping",
+      "classPath"    : "fields_grouping.FieldsGrouping",
+      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUp",
+      "classPath"    : "basic_topology_one_task_scale_up.BasicTopologyOneTask",
+      "updateArgs"   : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:3",
+      "expectedResultRelativePath" : "basic_topology_one_task_scale_up/BasicTopologyOneTaskResults.json",
+      "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleDown",
+      "classPath"    : "basic_topology_one_task_scale_down.BasicTopologyOneTask",
+      "updateArgs"   : "--component-parallelism=ab-spout:1 --component-parallelism=identity-bolt:2",
+      "expectedResultRelativePath" : "basic_topology_one_task_scale_down/BasicTopologyOneTaskResults.json",
+      "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_BasicTopologyOneTaskScaleUpDown",
+      "classPath"    : "basic_topology_one_task_scale_up_down.BasicTopologyOneTask",
+      "updateArgs"   : "--component-parallelism=ab-spout:2 --component-parallelism=identity-bolt:2",
+      "expectedResultRelativePath" : "basic_topology_one_task_scale_up_down/BasicTopologyOneTaskResults.json",
+      "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_FieldsGroupingDeactivate",
+      "classPath"    : "fields_grouping.FieldsGrouping",
+      "deactivateArgs"  : "True",
+      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "checkType" : "topology_structure"
+    },
+    {
+      "topologyName" : "IntegrationTopologyTest_FieldsGroupingKillContainer",
+      "classPath"    : "fields_grouping.FieldsGrouping",
+      "restartArgs"  : "True",
+      "expectedResultRelativePath" : "fields_grouping/FieldsGroupingResults.json",
+      "checkType" : "topology_structure"
+    }
+  ]
+}
diff --git a/scripts/run_integration_topology_test.sh b/scripts/run_integration_topology_test.sh
new file mode 100644
index 0000000000..cfa81b30a2
--- /dev/null
+++ b/scripts/run_integration_topology_test.sh
@@ -0,0 +1,51 @@
+#!/bin/bash
+#
+# Script to locally run the integration topology test.
+#
+
+TEST_RUNNER="./bazel-bin/integration_test/src/python/topology_test_runner/topology-test-runner.pex"
+
+JAVA_TESTS_DIR="integration_test/src/java/org/apache/heron/integration_topology_test/topology"
+PYTHON_TESTS_DIR="integration_test/src/python/integration_test/topology"
+SCALA_TESTS_DIR="integration_test/src/scala/org/apache/heron/integration_test/topology"
+
+# integration test binaries have to be specified as absolute path
+JAVA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/java/integration-topology-tests.jar"
+PYTHON_INTEGRATION_TESTS_BIN="${PWD}/bazel-bin/integration_test/src/python/integration_test/topology/heron_integ_topology.pex"
+SCALA_INTEGRATION_TESTS_BIN="${PWD}/bazel-genfiles/integration_test/src/scala/scala-integration-tests.jar"
+
+CORE_PKG="file://${PWD}/bazel-bin/scripts/packages/heron-core.tar.gz"
+
+set -e
+
+# building tar packages
+DIR=`dirname $0`
+source ${DIR}/detect_os_type.sh
+bazel run --config=`platform` -- scripts/packages:heron-install.sh --user
+bazel build --config=`platform` {heron/...,scripts/packages:tarpkgs,integration_test/src/...}
+
+# run the simple http server
+#${HTTP_SERVER} 8080 &
+#http_server_id=$!
+#trap "kill -9 $http_server_id" SIGINT SIGTERM EXIT
+
+# run the scala integration tests
+#${TEST_RUNNER} \
+#  -hc ~/.heron/bin/heron -tb ${SCALA_INTEGRATION_TESTS_BIN} \
+#  -rh localhost -rp 8080 \
+#  -tp ${SCALA_TESTS_DIR} \
+#  -cl local -rl heron-staging -ev devel -pi ${CORE_PKG}
+
+# run the java integration tests
+${TEST_RUNNER} \
+  -hc ~/.heron/bin/heron -tb ${JAVA_INTEGRATION_TESTS_BIN} \
+  -rh localhost -rp 8080 \
+  -tp ${JAVA_TESTS_DIR} \
+  -cl local -rl heron-staging -ev devel -pi ${CORE_PKG}
+
+# run the python integration tests
+#${TEST_RUNNER} \
+#  -hc ~/.heron/bin/heron -tb ${PYTHON_INTEGRATION_TESTS_BIN} \
+#  -rh localhost -rp 8080 \
+#  -tp ${PYTHON_TESTS_DIR} \
+#  -cl local -rl heron-staging -ev devel -pi ${CORE_PKG}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services