You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ka...@apache.org on 2018/06/26 14:38:03 UTC
[incubator-heron] branch master updated: Make TopologyBuilder less
permissive (#2929)
This is an automated email from the ASF dual-hosted git repository.
karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee2490 Make TopologyBuilder less permissive (#2929)
5ee2490 is described below
commit 5ee2490c025c7684c097962cf6d25f54ead73cf4
Author: Oliver Bristow <ev...@gmail.com>
AuthorDate: Tue Jun 26 16:37:58 2018 +0200
Make TopologyBuilder less permissive (#2929)
* Raise issues in TopologyBuilder when introduced
* Do not allow malformed HERON_OPTION parts
* Convert TopologyBuilder._specs to dict
---
heronpy/api/topology.py | 45 +++++++++++++++++++++------------------------
1 file changed, 21 insertions(+), 24 deletions(-)
diff --git a/heronpy/api/topology.py b/heronpy/api/topology.py
index 5af5873..a4c8f94 100644
--- a/heronpy/api/topology.py
+++ b/heronpy/api/topology.py
@@ -153,7 +153,7 @@ class TopologyType(type):
return
heron_options = TopologyType.get_heron_options_from_env()
initial_state = heron_options.get("cmdline.topology.initial.state", "RUNNING")
- tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory", None)
+ tmp_directory = heron_options.get("cmdline.topologydefn.tmpdirectory")
if tmp_directory is None:
raise RuntimeError("Topology definition temp directory not specified")
@@ -194,24 +194,25 @@ class TopologyType(type):
Currently supports the following options natively:
- - `cmdline.topologydefn.tmpdirectory`: the directory to which this
+ - `cmdline.topologydefn.tmpdirectory`: (required) the directory to which this
topology's defn file is written
- - `cmdline.topology.initial.state`: the initial state of the topology
- - `cmdline.topology.name`: topology name on deployment
+ - `cmdline.topology.initial.state`: (default: "RUNNING") the initial state of the topology
+ - `cmdline.topology.name`: (default: class name) topology name on deployment
Returns: map mapping from key to value
"""
- heron_options_raw = os.environ.get("HERON_OPTIONS", None)
+ heron_options_raw = os.environ.get("HERON_OPTIONS")
if heron_options_raw is None:
raise RuntimeError("HERON_OPTIONS environment variable not found")
- ret = {}
- heron_opt_list = heron_options_raw.replace("%%%%", " ").split(',')
- for opt_raw in heron_opt_list:
- opt = opt_raw.split("=")
- if len(opt) == 2:
- ret[opt[0]] = opt[1]
- return ret
+ options = {}
+ for option_line in heron_options_raw.replace("%%%%", " ").split(','):
+ key, sep, value = option_line.partition("=")
+ if sep:
+ options[key] = value
+ else:
+ raise ValueError("Invalid HERON_OPTIONS part %r" % option_line)
+ return options
@classmethod
def add_bolts_and_spouts(mcs, topology, class_dict):
@@ -336,7 +337,7 @@ class TopologyBuilder(object):
self.topology_name = name
- self._specs = []
+ self._specs = {}
self._topology_config = {}
def add_spec(self, *specs):
@@ -351,7 +352,12 @@ class TopologyBuilder(object):
% str(spec))
if spec.name is None:
raise ValueError("TopologyBuilder cannot take a spec without name")
- self._specs.append(spec)
+ if spec.name == "config":
+ raise ValueError("config is a reserved name")
+ if spec.name in self._specs:
+ raise ValueError("Attempting to add duplicate spec name: %r %r" % (spec.name, spec))
+
+ self._specs[spec.name] = spec
def add_spout(self, name, spout_cls, par, config=None, optional_outputs=None):
"""Add a spout to the topology"""
@@ -378,16 +384,7 @@ class TopologyBuilder(object):
self._topology_config = config
def _construct_topo_class_dict(self):
- class_dict = {}
-
- # specs
- for spec in self._specs:
- name = spec.name
- if name in class_dict:
- raise ValueError("Duplicate spec names: %s" % name)
- class_dict[name] = spec
-
- # config
+ class_dict = self._specs.copy()
class_dict["config"] = self._topology_config
return class_dict