You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:39:56 UTC
[5/6] incubator-beam git commit: Moving files. Using DisplayDataItem
to enable dictionaries to be used as display data
Moving files. Using DisplayDataItem to enable dictionaries to be used as display data
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d864d968
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d864d968
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d864d968
Branch: refs/heads/python-sdk
Commit: d864d968eefb0fdc0088ec65fee0cc54955ae7b2
Parents: 1102201
Author: Pablo <pa...@google.com>
Authored: Mon Oct 17 16:56:40 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:32 2016 -0700
----------------------------------------------------------------------
.../apache_beam/runners/dataflow_runner.py | 28 +---
sdks/python/apache_beam/runners/runner_test.py | 3 +-
sdks/python/apache_beam/transforms/__init__.py | 1 -
sdks/python/apache_beam/transforms/display.py | 152 +++++++++++++++++++
.../apache_beam/transforms/display/__init__.py | 1 -
.../transforms/display/display_data.py | 151 ------------------
.../transforms/display/display_data_test.py | 119 ---------------
.../apache_beam/transforms/display_test.py | 119 +++++++++++++++
8 files changed, 277 insertions(+), 297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 226b460..c543d2f 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -263,6 +263,10 @@ class DataflowPipelineRunner(PipelineRunner):
# cache always contain the tag.
for tag in side_tags:
self._cache.cache_output(transform_node, tag, step)
+
+ step.add_property(
+ PropertyNames.DISPLAY_DATA,
+ DisplayData.create_from(transform_node.transform).output())
return step
def run_Create(self, transform_node):
@@ -287,9 +291,6 @@ class DataflowPipelineRunner(PipelineRunner):
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform).output())
def run_CreatePCollectionView(self, transform_node):
step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
@@ -308,9 +309,6 @@ class DataflowPipelineRunner(PipelineRunner):
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform_node.transform).output())
def run_Flatten(self, transform_node):
step = self._add_step(TransformNames.FLATTEN,
@@ -330,9 +328,6 @@ class DataflowPipelineRunner(PipelineRunner):
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform_node.transform).output())
def apply_GroupByKey(self, transform, pcoll):
# Infer coder of parent.
@@ -374,9 +369,6 @@ class DataflowPipelineRunner(PipelineRunner):
windowing = transform_node.transform.get_windowing(
transform_node.inputs)
step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform_node.transform).output())
def run_ParDo(self, transform_node):
transform = transform_node.transform
@@ -437,9 +429,6 @@ class DataflowPipelineRunner(PipelineRunner):
'%s_%s' % (PropertyNames.OUT, side_tag))})
step.add_property(PropertyNames.OUTPUT_INFO, outputs)
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform).output())
@staticmethod
def _pardo_fn_data(transform_node, get_label):
@@ -488,9 +477,6 @@ class DataflowPipelineRunner(PipelineRunner):
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
step.add_property(PropertyNames.OUTPUT_INFO, outputs)
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform).output())
def run_Read(self, transform_node):
transform = transform_node.transform
@@ -565,9 +551,6 @@ class DataflowPipelineRunner(PipelineRunner):
'%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform).output())
def run__NativeWrite(self, transform_node):
transform = transform_node.transform
@@ -637,9 +620,6 @@ class DataflowPipelineRunner(PipelineRunner):
{'@type': 'OutputReference',
PropertyNames.STEP_NAME: input_step.proto.name,
PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
- step.add_property(
- PropertyNames.DISPLAY_DATA,
- DisplayData.create_from(transform).output())
class DataflowPipelineResult(PipelineResult):
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 8663a15..19160c3 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -112,7 +112,8 @@ class RunnerTest(unittest.TestCase):
'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
'key': 'a_time'},
{'type': 'JAVA_CLASS', 'namespace': nspace+'SpecialParDo',
- 'value': nspace+'SpecialParDo', 'key': 'a_class'},
+ 'value': nspace+'SpecialParDo', 'key': 'a_class',
+ 'shortValue': 'SpecialParDo'},
{'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn',
'value': 42, 'key': 'dofn_value'}]
expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key'])
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 3cfe60b..db8e193 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -24,4 +24,3 @@ from apache_beam.transforms.core import *
from apache_beam.transforms.ptransform import *
from apache_beam.transforms.timeutil import *
from apache_beam.transforms.util import *
-from apache_beam.transforms.display import *
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
new file mode 100644
index 0000000..87d3046
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+DisplayData, its classes, interfaces and methods.
+"""
+
+from __future__ import absolute_import
+
+import calendar
+from datetime import datetime, timedelta
+import inspect
+import json
+
+__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
+
+
+class HasDisplayData(object):
+ """ Basic interface for elements that contain display data.
+
+ It contains only the display_data method and a namespace method.
+ """
+ def display_data(self):
+ return {}
+
+ def _namespace(self):
+ return '{}.{}'.format(self.__module__, self.__class__.__name__)
+
+
+class DisplayData(object):
+ def __init__(self, namespace, display_data_dict):
+ self.namespace = namespace
+ self.items = []
+ self.populate_items(display_data_dict)
+
+ def populate_items(self, display_data_dict):
+ for key, element in display_data_dict.items():
+ if isinstance(element, HasDisplayData):
+ subcomponent_display_data = DisplayData(element._namespace(),
+ element.display_data())
+ self.items += subcomponent_display_data.items
+ continue
+
+ if isinstance(element, DisplayDataItem):
+ element.key = key
+ element.namespace = self.namespace
+ self.items.append(element)
+ continue
+
+ # If it's not a HasDisplayData element,
+ # nor a dictionary, then it's a simple value
+ self.items.append(
+ DisplayDataItem(element,
+ namespace=self.namespace,
+ key=key))
+
+ def output(self):
+ return [item.get_dict() for item in self.items]
+
+ @classmethod
+ def create_from(cls, has_display_data):
+ if not isinstance(has_display_data, HasDisplayData):
+ raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
+ .format(has_display_data.__module__,
+ has_display_data.__class__.__name__))
+ return cls(has_display_data._namespace(), has_display_data.display_data())
+
+
+class DisplayDataItem(object):
+ typeDict = {str:'STRING',
+ int:'INTEGER',
+ float:'FLOAT',
+ timedelta:'DURATION',
+ datetime:'TIMESTAMP'}
+
+ def __init__(self, value, url=None, label=None,
+ namespace=None, key=None, shortValue=None):
+ self.namespace = namespace
+ self.key = key
+ self.type = self._get_value_type(value)
+ self.shortValue = (shortValue if shortValue is not None else
+ self._get_short_value(value, self.type))
+ self.value = value
+ self.url = url
+ self.label = label
+
+ def is_valid(self):
+ if self.key is None:
+ raise ValueError('Key must not be None')
+ if self.namespace is None:
+ raise ValueError('Namespace must not be None')
+ if self.value is None:
+ raise ValueError('Value must not be None')
+ if self.type is None:
+ raise ValueError('Value {} is of an unsupported type.'.format(self.value))
+
+ def get_dict(self):
+ self.is_valid()
+
+ res = {'key': self.key,
+ 'namespace': self.namespace,
+ 'type': self.type}
+
+ if self.url is not None:
+ res['url'] = self.url
+ if self.shortValue is not None:
+ res['shortValue'] = self.shortValue
+ if self.label is not None:
+ res['label'] = self.label
+ res['value'] = self._format_value(self.value, self.type)
+ return res
+
+ def __repr__(self):
+ return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
+
+ @classmethod
+ def _format_value(cls, value, type_):
+ res = value
+ if type_ == 'JAVA_CLASS':
+ res = '{}.{}'.format(value.__module__, value.__name__)
+ if type_ == 'DURATION':
+ res = value.total_seconds()*1000
+ if type_ == 'TIMESTAMP':
+ res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000
+ return res
+
+ @classmethod
+ def _get_short_value(cls, value, type_):
+ if type_ == 'JAVA_CLASS':
+ return value.__name__
+ return None
+
+ @classmethod
+ def _get_value_type(cls, value):
+ type_ = cls.typeDict.get(type(value))
+ if type_ is None:
+ type_ = 'JAVA_CLASS' if inspect.isclass(value) else None
+ return type_
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/__init__.py b/sdks/python/apache_beam/transforms/display/__init__.py
deleted file mode 100644
index c946ac3..0000000
--- a/sdks/python/apache_beam/transforms/display/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-from apache_beam.transforms.display.display_data import *
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/display_data.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data.py b/sdks/python/apache_beam/transforms/display/display_data.py
deleted file mode 100644
index cee7e74..0000000
--- a/sdks/python/apache_beam/transforms/display/display_data.py
+++ /dev/null
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-DisplayData, its classes, interfaces and methods.
-"""
-
-from __future__ import absolute_import
-
-import calendar
-from datetime import datetime, timedelta
-import inspect
-import json
-
-__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
-
-
-class HasDisplayData(object):
- """ Basic interface for elements that contain display data.
-
- It contains only the display_data method.
- """
- def __init__(self, *args, **kwargs):
- super(HasDisplayData, self).__init__(*args, **kwargs)
-
- def display_data(self):
- return {}
-
- def _namespace(self):
- return '{}.{}'.format(self.__module__, self.__class__.__name__)
-
-
-class DisplayData(object):
- def __init__(self, namespace='__main__'):
- self.namespace = namespace
- self.items = []
-
- def populate_items(self, display_data_dict):
- for key, element in display_data_dict.items():
- if isinstance(element, HasDisplayData):
- subcomponent_display_data = DisplayData(element._namespace())
- subcomponent_display_data.populate_items(element.display_data())
- self.items += subcomponent_display_data.items
- continue
-
- if isinstance(element, dict):
- self.items.append(
- DisplayDataItem(self.namespace,
- key,
- DisplayDataItem._get_value_type(element['value']),
- element['value'],
- shortValue=element.get('shortValue'),
- url=element.get('url'),
- label=element.get('label')))
- continue
-
- # If it's not a HasDisplayData element,
- # nor a dictionary, then it's a simple value
- self.items.append(
- DisplayDataItem(self.namespace,
- key,
- DisplayDataItem._get_value_type(element),
- element))
-
- def output(self):
- return [item.get_dict() for item in self.items]
-
- @classmethod
- def create_from(cls, has_display_data):
- if not isinstance(has_display_data, HasDisplayData):
- raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
- .format(has_display_data.__module__,
- has_display_data.__class__.__name__))
- display_data = cls(has_display_data._namespace())
- display_data.populate_items(has_display_data.display_data())
- return display_data
-
-
-class DisplayDataItem(object):
- typeDict = {str:'STRING',
- int:'INTEGER',
- float:'FLOAT',
- timedelta:'DURATION',
- datetime:'TIMESTAMP'}
-
- def __init__(self, namespace, key, type_, value,
- shortValue=None, url=None, label=None):
- if key is None:
- raise ValueError('Key must not be None')
- if value is None:
- raise ValueError('Value must not be None')
- if type_ is None:
- raise ValueError('Value {} is of an unsupported type.'.format(value))
-
- self.namespace = namespace
- self.key = key
- self.type = type_
- self.value = value
- self.shortValue = shortValue
- self.url = url
- self.label = label
-
- def get_dict(self):
- res = {'key': self.key,
- 'namespace': self.namespace,
- 'type': self.type}
-
- if self.url is not None:
- res['url'] = self.url
- # TODO: What to do about shortValue? No special processing?
- if self.shortValue is not None:
- res['shortValue'] = self.shortValue
- if self.label is not None:
- res['label'] = self.label
- res['value'] = self._format_value(self.value, self.type)
- return res
-
- def __repr__(self):
- return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
-
- @classmethod
- def _format_value(cls, value, type_):
- res = value
- if type_ == 'JAVA_CLASS':
- res = '{}.{}'.format(value.__module__, value.__name__)
- if type_ == 'DURATION':
- res = value.total_seconds()*1000
- if type_ == 'TIMESTAMP':
- res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000
- return res
-
- @classmethod
- def _get_value_type(cls, value):
- type_ = cls.typeDict.get(type(value))
- if type_ is None:
- type_ = 'JAVA_CLASS' if inspect.isclass(value) else None
- return type_
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/display_data_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data_test.py b/sdks/python/apache_beam/transforms/display/display_data_test.py
deleted file mode 100644
index 345e137..0000000
--- a/sdks/python/apache_beam/transforms/display/display_data_test.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the DisplayData API."""
-
-from __future__ import absolute_import
-
-from datetime import datetime
-import unittest
-
-import apache_beam as beam
-from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem
-
-
-class DisplayDataTest(unittest.TestCase):
-
- def test_inheritance_ptransform(self):
- class MyTransform(beam.PTransform):
- pass
-
- display_pt = MyTransform()
- # PTransform inherits from HasDisplayData.
- self.assertTrue(isinstance(display_pt, HasDisplayData))
- self.assertEqual(display_pt.display_data(), {})
-
- def test_inheritance_dofn(self):
- class MyDoFn(beam.DoFn):
- pass
-
- display_dofn = MyDoFn()
- self.assertTrue(isinstance(display_dofn, HasDisplayData))
- self.assertEqual(display_dofn.display_data(), {})
-
- def test_base_cases(self):
- """ Tests basic display data cases (key:value, key:dict)
- It does not test subcomponent inclusion
- """
- class MyDoFn(beam.DoFn):
- def __init__(self, *args, **kwargs):
- self.my_display_data = kwargs.get('display_data', None)
-
- def process(self, context):
- yield context.element + 1
-
- def display_data(self):
- return {'static_integer': 120,
- 'static_string': 'static me!',
- 'complex_url': {'value': 'github.com',
- 'url': 'http://github.com',
- 'label': 'The URL'},
- 'python_class': HasDisplayData,
- 'my_dd': self.my_display_data}
-
- now = datetime.now()
- fn = MyDoFn(display_data=now)
- dd = DisplayData.create_from(fn)
- dd_dicts = sorted([item.get_dict() for item in dd.items],
- key=lambda x: x['namespace']+x['key'])
-
- nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
- expected_items = [
- {'url': 'http://github.com', 'namespace': nspace,
- 'value': 'github.com', 'label': 'The URL',
- 'key': 'complex_url', 'type': 'STRING'},
- {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd',
- 'value': DisplayDataItem._format_value(now, 'TIMESTAMP')},
- {'type': 'JAVA_CLASS', 'namespace': nspace,
- 'value': 'apache_beam.transforms.display.display_data.HasDisplayData',
- 'key': 'python_class'},
- {'type': 'INTEGER', 'namespace': nspace,
- 'value': 120, 'key': 'static_integer'},
- {'type': 'STRING', 'namespace': nspace,
- 'value': 'static me!', 'key': 'static_string'}]
- expected_items = sorted(expected_items,
- key=lambda x: x['namespace']+x['key'])
-
- self.assertEqual(dd_dicts, expected_items)
-
- def test_subcomponent(self):
- class SpecialParDo(beam.ParDo):
- def __init__(self, fn):
- self.fn = fn
-
- def display_data(self):
- return {'asubcomponent': self.fn}
-
- class SpecialDoFn(beam.DoFn):
- def display_data(self):
- return {'dofn_value': 42}
-
- dofn = SpecialDoFn()
- pardo = SpecialParDo(dofn)
- dd = DisplayData.create_from(pardo)
- nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
- self.assertEqual(dd.items[0].get_dict(),
- {"type": "INTEGER",
- "namespace": nspace,
- "value": 42,
- "key": "dofn_value"})
-
-
-# TODO: Test __repr__ function
-# TODO: Test PATH when added by swegner@
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
new file mode 100644
index 0000000..227f3bc
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DisplayData API."""
+
+from __future__ import absolute_import
+
+from datetime import datetime
+import unittest
+
+import apache_beam as beam
+from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem
+
+
+class DisplayDataTest(unittest.TestCase):
+
+ def test_inheritance_ptransform(self):
+ class MyTransform(beam.PTransform):
+ pass
+
+ display_pt = MyTransform()
+ # PTransform inherits from HasDisplayData.
+ self.assertTrue(isinstance(display_pt, HasDisplayData))
+ self.assertEqual(display_pt.display_data(), {})
+
+ def test_inheritance_dofn(self):
+ class MyDoFn(beam.DoFn):
+ pass
+
+ display_dofn = MyDoFn()
+ self.assertTrue(isinstance(display_dofn, HasDisplayData))
+ self.assertEqual(display_dofn.display_data(), {})
+
+ def test_base_cases(self):
+ """ Tests basic display data cases (key:value, key:dict)
+ It does not test subcomponent inclusion
+ """
+ class MyDoFn(beam.DoFn):
+ def __init__(self, *args, **kwargs):
+ self.my_display_data = kwargs.get('display_data', None)
+
+ def process(self, context):
+ yield context.element + 1
+
+ def display_data(self):
+ return {'static_integer': 120,
+ 'static_string': 'static me!',
+ 'complex_url': DisplayDataItem('github.com',
+ url='http://github.com',
+ label='The URL'),
+ 'python_class': HasDisplayData,
+ 'my_dd': self.my_display_data}
+
+ now = datetime.now()
+ fn = MyDoFn(display_data=now)
+ dd = DisplayData.create_from(fn)
+ dd_dicts = sorted([item.get_dict() for item in dd.items],
+ key=lambda x: x['namespace']+x['key'])
+
+ nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
+ expected_items = [
+ {'url': 'http://github.com', 'namespace': nspace,
+ 'value': 'github.com', 'label': 'The URL',
+ 'key': 'complex_url', 'type': 'STRING'},
+ {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd',
+ 'value': DisplayDataItem._format_value(now, 'TIMESTAMP')},
+ {'type': 'JAVA_CLASS', 'namespace': nspace,
+ 'shortValue': 'HasDisplayData', 'key': 'python_class',
+ 'value': 'apache_beam.transforms.display.HasDisplayData'},
+ {'type': 'INTEGER', 'namespace': nspace,
+ 'value': 120, 'key': 'static_integer'},
+ {'type': 'STRING', 'namespace': nspace,
+ 'value': 'static me!', 'key': 'static_string'}]
+ expected_items = sorted(expected_items,
+ key=lambda x: x['namespace']+x['key'])
+
+ self.assertEqual(dd_dicts, expected_items)
+
+ def test_subcomponent(self):
+ class SpecialParDo(beam.ParDo):
+ def __init__(self, fn):
+ self.fn = fn
+
+ def display_data(self):
+ return {'asubcomponent': self.fn}
+
+ class SpecialDoFn(beam.DoFn):
+ def display_data(self):
+ return {'dofn_value': 42}
+
+ dofn = SpecialDoFn()
+ pardo = SpecialParDo(dofn)
+ dd = DisplayData.create_from(pardo)
+ nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
+ self.assertEqual(dd.items[0].get_dict(),
+ {"type": "INTEGER",
+ "namespace": nspace,
+ "value": 42,
+ "key": "dofn_value"})
+
+
+# TODO: Test __repr__ function
+# TODO: Test PATH when added by swegner@
+if __name__ == '__main__':
+ unittest.main()