You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/27 17:39:52 UTC

[1/6] incubator-beam git commit: Addressing comments

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 57f03f793 -> 03662da9d


Addressing comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1d478dd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1d478dd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1d478dd4

Branch: refs/heads/python-sdk
Commit: 1d478dd43dc1c54c1ef0e70e4aa6c2a548c387fa
Parents: bfcff01
Author: Pablo <pa...@google.com>
Authored: Thu Oct 20 13:15:00 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:32 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  6 ++++-
 sdks/python/apache_beam/runners/runner_test.py  |  6 +++--
 sdks/python/apache_beam/transforms/display.py   | 24 +++++++++++---------
 .../apache_beam/transforms/display_test.py      | 13 +++++------
 4 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index c543d2f..a387332 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -264,9 +264,13 @@ class DataflowPipelineRunner(PipelineRunner):
     for tag in side_tags:
       self._cache.cache_output(transform_node, tag, step)
 
+    # Finally, we add the display data items to the pipeline step.
+    # If the transform contains no display data then an empty list is added.
     step.add_property(
         PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform_node.transform).output())
+        [item.get_dict() for item in
+         DisplayData.create_from(transform_node.transform).items])
+
     return step
 
   def run_Create(self, transform_node):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 891f9dc..0ba42d3 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -74,9 +74,11 @@ class RunnerTest(unittest.TestCase):
     p = Pipeline(remote_runner,
                  options=PipelineOptions(self.default_properties))
 
+    # TODO: Should not subclass ParDo. Switch to PTransform as soon as
+    # composite transforms support display data.
     class SpecialParDo(beam.ParDo):
-      def __init__(self, fn, now, *args, **kwargs):
-        super(SpecialParDo, self).__init__(fn, *args, **kwargs)
+      def __init__(self, fn, now):
+        super(SpecialParDo, self).__init__(fn)
         self.fn = fn
         self.now = now
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index 372cbf5..71fd34e 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -80,9 +80,9 @@ class DisplayData(object):
   def __init__(self, namespace, display_data_dict):
     self.namespace = namespace
     self.items = []
-    self.populate_items(display_data_dict)
+    self._populate_items(display_data_dict)
 
-  def populate_items(self, display_data_dict):
+  def _populate_items(self, display_data_dict):
     """ Populates the list of display data items.
     """
     for key, element in display_data_dict.items():
@@ -105,11 +105,6 @@ class DisplayData(object):
                           namespace=self.namespace,
                           key=key))
 
-  def output(self):
-    """ Returns the JSON-API list of display data items to send to the runner.
-    """
-    return [item.get_dict() for item in self.items]
-
   @classmethod
   def create_from(cls, has_display_data):
     """ Creates DisplayData from a HasDisplayData instance.
@@ -159,13 +154,15 @@ class DisplayDataItem(object):
       ValueError: If the item does not have a key, namespace, value or type.
     """
     if self.key is None:
-      raise ValueError('Key must not be None')
+      raise ValueError('Invalid DisplayDataItem. Key must not be None')
     if self.namespace is None:
-      raise ValueError('Namespace must not be None')
+      raise ValueError('Invalid DisplayDataItem. Namespace must not be None')
     if self.value is None:
-      raise ValueError('Value must not be None')
+      raise ValueError('Invalid DisplayDataItem. Value must not be None')
     if self.type is None:
-      raise ValueError('Value {} is of an unsupported type.'.format(self.value))
+      raise ValueError(
+          'Invalid DisplayDataItem. Value {} is of an unsupported type.'
+          .format(self.value))
 
   def get_dict(self):
     """ Returns the internal-API dictionary representing the DisplayDataItem.
@@ -197,6 +194,11 @@ class DisplayDataItem(object):
   def __repr__(self):
     return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
 
+  def __eq__(self, other):
+    if isinstance(other, self.__class__):
+      return self.get_dict() == other.get_dict()
+    return False
+
   @classmethod
   def _format_value(cls, value, type_):
     """ Returns the API representation of a value given its type.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d478dd4/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
index 906bb8f..013172f 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -50,8 +50,8 @@ class DisplayDataTest(unittest.TestCase):
     It does not test subcomponent inclusion
     """
     class MyDoFn(beam.DoFn):
-      def __init__(self, *args, **kwargs):
-        self.my_display_data = kwargs.get('display_data', None)
+      def __init__(self, my_display_data=None):
+        self.my_display_data = my_display_data
 
       def process(self, context):
         yield context.element + 1
@@ -66,13 +66,13 @@ class DisplayDataTest(unittest.TestCase):
                 'my_dd': self.my_display_data}
 
     now = datetime.now()
-    fn = MyDoFn(display_data=now)
+    fn = MyDoFn(my_display_data=now)
     dd = DisplayData.create_from(fn)
     dd_dicts = sorted([item.get_dict() for item in dd.items],
                       key=lambda x: x['namespace']+x['key'])
 
     nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
-    expected_items = [
+    expected_items = sorted([
         {'url': 'http://github.com', 'namespace': nspace,
          'value': 'github.com', 'label': 'The URL',
          'key': 'complex_url', 'type': 'STRING'},
@@ -84,14 +84,13 @@ class DisplayDataTest(unittest.TestCase):
         {'type': 'INTEGER', 'namespace': nspace,
          'value': 120, 'key': 'static_integer'},
         {'type': 'STRING', 'namespace': nspace,
-         'value': 'static me!', 'key': 'static_string'}]
-    expected_items = sorted(expected_items,
+         'value': 'static me!', 'key': 'static_string'}],
                             key=lambda x: x['namespace']+x['key'])
 
     self.assertEqual(dd_dicts, expected_items)
 
   def test_subcomponent(self):
-    class SpecialParDo(beam.ParDo):
+    class SpecialParDo(beam.PTransform):
       def __init__(self, fn):
         self.fn = fn
 


[4/6] incubator-beam git commit: Adding license text to all files. Fixing one lint issue. Refactoring runner_test

Posted by ro...@apache.org.
Adding license text to all files. Fixing one lint issue. Refactoring runner_test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11022016
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11022016
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11022016

Branch: refs/heads/python-sdk
Commit: 110220168d58330f54a3e8279fbb63af036dd7df
Parents: 07885c8
Author: Pablo <pa...@google.com>
Authored: Mon Oct 17 15:10:59 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:32 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner_test.py  | 25 +++++++-------------
 .../transforms/display/display_data.py          | 18 ++++++++++++++
 .../transforms/display/display_data_test.py     | 19 +++++++++++++++
 3 files changed, 46 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11022016/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 1f73a36..8663a15 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -39,6 +39,13 @@ from apache_beam.utils.options import PipelineOptions
 
 
 class RunnerTest(unittest.TestCase):
+  default_properties = [
+      '--dataflow_endpoint=ignored',
+      '--job_name=test-job',
+      '--project=test-project',
+      '--staging_location=ignored',
+      '--temp_location=/dev/null',
+      '--no_auth=True']
 
   def test_create_runner(self):
     self.assertTrue(
@@ -54,14 +61,7 @@ class RunnerTest(unittest.TestCase):
   def test_remote_runner_translation(self):
     remote_runner = DataflowPipelineRunner()
     p = Pipeline(remote_runner,
-                 options=PipelineOptions([
-                     '--dataflow_endpoint=ignored',
-                     '--job_name=test-job',
-                     '--project=test-project',
-                     '--staging_location=ignored',
-                     '--temp_location=/dev/null',
-                     '--no_auth=True'
-                 ]))
+                 options=PipelineOptions(self.default_properties))
 
     (p | 'create' >> ptransform.Create([1, 2, 3])  # pylint: disable=expression-not-assigned
      | 'do' >> ptransform.FlatMap(lambda x: [(x, x)])
@@ -72,14 +72,7 @@ class RunnerTest(unittest.TestCase):
   def test_remote_runner_display_data(self):
     remote_runner = DataflowPipelineRunner()
     p = Pipeline(remote_runner,
-                 options=PipelineOptions([
-                     '--dataflow_endpoint=ignored',
-                     '--job_name=test-job',
-                     '--project=test-project',
-                     '--staging_location=ignored',
-                     '--temp_location=/dev/null',
-                     '--no_auth=True'
-                 ]))
+                 options=PipelineOptions(self.default_properties))
 
     class SpecialParDo(beam.ParDo):
       def __init__(self, fn, now, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11022016/sdks/python/apache_beam/transforms/display/display_data.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data.py b/sdks/python/apache_beam/transforms/display/display_data.py
index 44909ad..cee7e74 100644
--- a/sdks/python/apache_beam/transforms/display/display_data.py
+++ b/sdks/python/apache_beam/transforms/display/display_data.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 """
 DisplayData, its classes, interfaces and methods.
 """
@@ -11,6 +28,7 @@ import json
 
 __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
 
+
 class HasDisplayData(object):
   """ Basic interface for elements that contain display data.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/11022016/sdks/python/apache_beam/transforms/display/display_data_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data_test.py b/sdks/python/apache_beam/transforms/display/display_data_test.py
index a107334..345e137 100644
--- a/sdks/python/apache_beam/transforms/display/display_data_test.py
+++ b/sdks/python/apache_beam/transforms/display/display_data_test.py
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DisplayData API."""
+
 from __future__ import absolute_import
 
 from datetime import datetime


[6/6] incubator-beam git commit: Closes #1112

Posted by ro...@apache.org.
Closes #1112


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/03662da9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/03662da9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/03662da9

Branch: refs/heads/python-sdk
Commit: 03662da9d0bd1d3bfd5bf5bfa3d0c1e1290ab576
Parents: 57f03f7 1d478dd
Author: Robert Bradshaw <ro...@google.com>
Authored: Thu Oct 27 10:39:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:33 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  10 +
 sdks/python/apache_beam/runners/runner_test.py  |  72 +++++-
 sdks/python/apache_beam/transforms/core.py      |   3 +-
 sdks/python/apache_beam/transforms/display.py   | 255 +++++++++++++++++++
 .../apache_beam/transforms/display_test.py      | 118 +++++++++
 .../python/apache_beam/transforms/ptransform.py |   3 +-
 sdks/python/apache_beam/utils/names.py          |   1 +
 7 files changed, 452 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[5/6] incubator-beam git commit: Moving files. Using DisplayDataItem to enable dictionaries to be used as display data

Posted by ro...@apache.org.
Moving files. Using DisplayDataItem to enable dictionaries to be used as display data


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d864d968
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d864d968
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d864d968

Branch: refs/heads/python-sdk
Commit: d864d968eefb0fdc0088ec65fee0cc54955ae7b2
Parents: 1102201
Author: Pablo <pa...@google.com>
Authored: Mon Oct 17 16:56:40 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:32 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  28 +---
 sdks/python/apache_beam/runners/runner_test.py  |   3 +-
 sdks/python/apache_beam/transforms/__init__.py  |   1 -
 sdks/python/apache_beam/transforms/display.py   | 152 +++++++++++++++++++
 .../apache_beam/transforms/display/__init__.py  |   1 -
 .../transforms/display/display_data.py          | 151 ------------------
 .../transforms/display/display_data_test.py     | 119 ---------------
 .../apache_beam/transforms/display_test.py      | 119 +++++++++++++++
 8 files changed, 277 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 226b460..c543d2f 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -263,6 +263,10 @@ class DataflowPipelineRunner(PipelineRunner):
     # cache always contain the tag.
     for tag in side_tags:
       self._cache.cache_output(transform_node, tag, step)
+
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform_node.transform).output())
     return step
 
   def run_Create(self, transform_node):
@@ -287,9 +291,6 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform).output())
 
   def run_CreatePCollectionView(self, transform_node):
     step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
@@ -308,9 +309,6 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform_node.transform).output())
 
   def run_Flatten(self, transform_node):
     step = self._add_step(TransformNames.FLATTEN,
@@ -330,9 +328,6 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform_node.transform).output())
 
   def apply_GroupByKey(self, transform, pcoll):
     # Infer coder of parent.
@@ -374,9 +369,6 @@ class DataflowPipelineRunner(PipelineRunner):
     windowing = transform_node.transform.get_windowing(
         transform_node.inputs)
     step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform_node.transform).output())
 
   def run_ParDo(self, transform_node):
     transform = transform_node.transform
@@ -437,9 +429,6 @@ class DataflowPipelineRunner(PipelineRunner):
                '%s_%s' % (PropertyNames.OUT, side_tag))})
 
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform).output())
 
   @staticmethod
   def _pardo_fn_data(transform_node, get_label):
@@ -488,9 +477,6 @@ class DataflowPipelineRunner(PipelineRunner):
          PropertyNames.ENCODING: step.encoding,
          PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform).output())
 
   def run_Read(self, transform_node):
     transform = transform_node.transform
@@ -565,9 +551,6 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform).output())
 
   def run__NativeWrite(self, transform_node):
     transform = transform_node.transform
@@ -637,9 +620,6 @@ class DataflowPipelineRunner(PipelineRunner):
         {'@type': 'OutputReference',
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
-    step.add_property(
-        PropertyNames.DISPLAY_DATA,
-        DisplayData.create_from(transform).output())
 
 
 class DataflowPipelineResult(PipelineResult):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 8663a15..19160c3 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -112,7 +112,8 @@ class RunnerTest(unittest.TestCase):
                       'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
                       'key': 'a_time'},
                      {'type': 'JAVA_CLASS', 'namespace': nspace+'SpecialParDo',
-                      'value': nspace+'SpecialParDo', 'key': 'a_class'},
+                      'value': nspace+'SpecialParDo', 'key': 'a_class',
+                      'shortValue': 'SpecialParDo'},
                      {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn',
                       'value': 42, 'key': 'dofn_value'}]
     expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key'])

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 3cfe60b..db8e193 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -24,4 +24,3 @@ from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
 from apache_beam.transforms.timeutil import *
 from apache_beam.transforms.util import *
-from apache_beam.transforms.display import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
new file mode 100644
index 0000000..87d3046
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+DisplayData, its classes, interfaces and methods.
+"""
+
+from __future__ import absolute_import
+
+import calendar
+from datetime import datetime, timedelta
+import inspect
+import json
+
+__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
+
+
+class HasDisplayData(object):
+  """ Basic interface for elements that contain display data.
+
+  It contains only the display_data method and a namespace method.
+  """
+  def display_data(self):
+    return {}
+
+  def _namespace(self):
+    return '{}.{}'.format(self.__module__, self.__class__.__name__)
+
+
+class DisplayData(object):
+  def __init__(self, namespace, display_data_dict):
+    self.namespace = namespace
+    self.items = []
+    self.populate_items(display_data_dict)
+
+  def populate_items(self, display_data_dict):
+    for key, element in display_data_dict.items():
+      if isinstance(element, HasDisplayData):
+        subcomponent_display_data = DisplayData(element._namespace(),
+                                                element.display_data())
+        self.items += subcomponent_display_data.items
+        continue
+
+      if isinstance(element, DisplayDataItem):
+        element.key = key
+        element.namespace = self.namespace
+        self.items.append(element)
+        continue
+
+      # If it's not a HasDisplayData element,
+      # nor a dictionary, then it's a simple value
+      self.items.append(
+          DisplayDataItem(element,
+                          namespace=self.namespace,
+                          key=key))
+
+  def output(self):
+    return [item.get_dict() for item in self.items]
+
+  @classmethod
+  def create_from(cls, has_display_data):
+    if not isinstance(has_display_data, HasDisplayData):
+      raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
+                       .format(has_display_data.__module__,
+                               has_display_data.__class__.__name__))
+    return cls(has_display_data._namespace(), has_display_data.display_data())
+
+
+class DisplayDataItem(object):
+  typeDict = {str:'STRING',
+              int:'INTEGER',
+              float:'FLOAT',
+              timedelta:'DURATION',
+              datetime:'TIMESTAMP'}
+
+  def __init__(self, value, url=None, label=None,
+               namespace=None, key=None, shortValue=None):
+    self.namespace = namespace
+    self.key = key
+    self.type = self._get_value_type(value)
+    self.shortValue = (shortValue if shortValue is not None else
+                       self._get_short_value(value, self.type))
+    self.value = value
+    self.url = url
+    self.label = label
+
+  def is_valid(self):
+    if self.key is None:
+      raise ValueError('Key must not be None')
+    if self.namespace is None:
+      raise ValueError('Namespace must not be None')
+    if self.value is None:
+      raise ValueError('Value must not be None')
+    if self.type is None:
+      raise ValueError('Value {} is of an unsupported type.'.format(self.value))
+
+  def get_dict(self):
+    self.is_valid()
+
+    res = {'key': self.key,
+           'namespace': self.namespace,
+           'type': self.type}
+
+    if self.url is not None:
+      res['url'] = self.url
+    if self.shortValue is not None:
+      res['shortValue'] = self.shortValue
+    if self.label is not None:
+      res['label'] = self.label
+    res['value'] = self._format_value(self.value, self.type)
+    return res
+
+  def __repr__(self):
+    return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
+
+  @classmethod
+  def _format_value(cls, value, type_):
+    res = value
+    if type_ == 'JAVA_CLASS':
+      res = '{}.{}'.format(value.__module__, value.__name__)
+    if type_ == 'DURATION':
+      res = value.total_seconds()*1000
+    if type_ == 'TIMESTAMP':
+      res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000
+    return res
+
+  @classmethod
+  def _get_short_value(cls, value, type_):
+    if type_ == 'JAVA_CLASS':
+      return value.__name__
+    return None
+
+  @classmethod
+  def _get_value_type(cls, value):
+    type_ = cls.typeDict.get(type(value))
+    if type_ is None:
+      type_ = 'JAVA_CLASS' if inspect.isclass(value) else None
+    return type_

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/__init__.py b/sdks/python/apache_beam/transforms/display/__init__.py
deleted file mode 100644
index c946ac3..0000000
--- a/sdks/python/apache_beam/transforms/display/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-from apache_beam.transforms.display.display_data import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/display_data.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data.py b/sdks/python/apache_beam/transforms/display/display_data.py
deleted file mode 100644
index cee7e74..0000000
--- a/sdks/python/apache_beam/transforms/display/display_data.py
+++ /dev/null
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-DisplayData, its classes, interfaces and methods.
-"""
-
-from __future__ import absolute_import
-
-import calendar
-from datetime import datetime, timedelta
-import inspect
-import json
-
-__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
-
-
-class HasDisplayData(object):
-  """ Basic interface for elements that contain display data.
-
-  It contains only the display_data method.
-  """
-  def __init__(self, *args, **kwargs):
-    super(HasDisplayData, self).__init__(*args, **kwargs)
-
-  def display_data(self):
-    return {}
-
-  def _namespace(self):
-    return '{}.{}'.format(self.__module__, self.__class__.__name__)
-
-
-class DisplayData(object):
-  def __init__(self, namespace='__main__'):
-    self.namespace = namespace
-    self.items = []
-
-  def populate_items(self, display_data_dict):
-    for key, element in display_data_dict.items():
-      if isinstance(element, HasDisplayData):
-        subcomponent_display_data = DisplayData(element._namespace())
-        subcomponent_display_data.populate_items(element.display_data())
-        self.items += subcomponent_display_data.items
-        continue
-
-      if isinstance(element, dict):
-        self.items.append(
-            DisplayDataItem(self.namespace,
-                            key,
-                            DisplayDataItem._get_value_type(element['value']),
-                            element['value'],
-                            shortValue=element.get('shortValue'),
-                            url=element.get('url'),
-                            label=element.get('label')))
-        continue
-
-      # If it's not a HasDisplayData element,
-      # nor a dictionary, then it's a simple value
-      self.items.append(
-          DisplayDataItem(self.namespace,
-                          key,
-                          DisplayDataItem._get_value_type(element),
-                          element))
-
-  def output(self):
-    return [item.get_dict() for item in self.items]
-
-  @classmethod
-  def create_from(cls, has_display_data):
-    if not isinstance(has_display_data, HasDisplayData):
-      raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
-                       .format(has_display_data.__module__,
-                               has_display_data.__class__.__name__))
-    display_data = cls(has_display_data._namespace())
-    display_data.populate_items(has_display_data.display_data())
-    return display_data
-
-
-class DisplayDataItem(object):
-  typeDict = {str:'STRING',
-              int:'INTEGER',
-              float:'FLOAT',
-              timedelta:'DURATION',
-              datetime:'TIMESTAMP'}
-
-  def __init__(self, namespace, key, type_, value,
-               shortValue=None, url=None, label=None):
-    if key is None:
-      raise ValueError('Key must not be None')
-    if value is None:
-      raise ValueError('Value must not be None')
-    if type_ is None:
-      raise ValueError('Value {} is of an unsupported type.'.format(value))
-
-    self.namespace = namespace
-    self.key = key
-    self.type = type_
-    self.value = value
-    self.shortValue = shortValue
-    self.url = url
-    self.label = label
-
-  def get_dict(self):
-    res = {'key': self.key,
-           'namespace': self.namespace,
-           'type': self.type}
-
-    if self.url is not None:
-      res['url'] = self.url
-    # TODO: What to do about shortValue? No special processing?
-    if self.shortValue is not None:
-      res['shortValue'] = self.shortValue
-    if self.label is not None:
-      res['label'] = self.label
-    res['value'] = self._format_value(self.value, self.type)
-    return res
-
-  def __repr__(self):
-    return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
-
-  @classmethod
-  def _format_value(cls, value, type_):
-    res = value
-    if type_ == 'JAVA_CLASS':
-      res = '{}.{}'.format(value.__module__, value.__name__)
-    if type_ == 'DURATION':
-      res = value.total_seconds()*1000
-    if type_ == 'TIMESTAMP':
-      res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000
-    return res
-
-  @classmethod
-  def _get_value_type(cls, value):
-    type_ = cls.typeDict.get(type(value))
-    if type_ is None:
-      type_ = 'JAVA_CLASS' if inspect.isclass(value) else None
-    return type_

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display/display_data_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data_test.py b/sdks/python/apache_beam/transforms/display/display_data_test.py
deleted file mode 100644
index 345e137..0000000
--- a/sdks/python/apache_beam/transforms/display/display_data_test.py
+++ /dev/null
@@ -1,119 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the DisplayData API."""
-
-from __future__ import absolute_import
-
-from datetime import datetime
-import unittest
-
-import apache_beam as beam
-from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem
-
-
-class DisplayDataTest(unittest.TestCase):
-
-  def test_inheritance_ptransform(self):
-    class MyTransform(beam.PTransform):
-      pass
-
-    display_pt = MyTransform()
-    # PTransform inherits from HasDisplayData.
-    self.assertTrue(isinstance(display_pt, HasDisplayData))
-    self.assertEqual(display_pt.display_data(), {})
-
-  def test_inheritance_dofn(self):
-    class MyDoFn(beam.DoFn):
-      pass
-
-    display_dofn = MyDoFn()
-    self.assertTrue(isinstance(display_dofn, HasDisplayData))
-    self.assertEqual(display_dofn.display_data(), {})
-
-  def test_base_cases(self):
-    """ Tests basic display data cases (key:value, key:dict)
-    It does not test subcomponent inclusion
-    """
-    class MyDoFn(beam.DoFn):
-      def __init__(self, *args, **kwargs):
-        self.my_display_data = kwargs.get('display_data', None)
-
-      def process(self, context):
-        yield context.element + 1
-
-      def display_data(self):
-        return {'static_integer': 120,
-                'static_string': 'static me!',
-                'complex_url': {'value': 'github.com',
-                                'url': 'http://github.com',
-                                'label': 'The URL'},
-                'python_class': HasDisplayData,
-                'my_dd': self.my_display_data}
-
-    now = datetime.now()
-    fn = MyDoFn(display_data=now)
-    dd = DisplayData.create_from(fn)
-    dd_dicts = sorted([item.get_dict() for item in dd.items],
-                      key=lambda x: x['namespace']+x['key'])
-
-    nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
-    expected_items = [
-        {'url': 'http://github.com', 'namespace': nspace,
-         'value': 'github.com', 'label': 'The URL',
-         'key': 'complex_url', 'type': 'STRING'},
-        {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd',
-         'value': DisplayDataItem._format_value(now, 'TIMESTAMP')},
-        {'type': 'JAVA_CLASS', 'namespace': nspace,
-         'value': 'apache_beam.transforms.display.display_data.HasDisplayData',
-         'key': 'python_class'},
-        {'type': 'INTEGER', 'namespace': nspace,
-         'value': 120, 'key': 'static_integer'},
-        {'type': 'STRING', 'namespace': nspace,
-         'value': 'static me!', 'key': 'static_string'}]
-    expected_items = sorted(expected_items,
-                            key=lambda x: x['namespace']+x['key'])
-
-    self.assertEqual(dd_dicts, expected_items)
-
-  def test_subcomponent(self):
-    class SpecialParDo(beam.ParDo):
-      def __init__(self, fn):
-        self.fn = fn
-
-      def display_data(self):
-        return {'asubcomponent': self.fn}
-
-    class SpecialDoFn(beam.DoFn):
-      def display_data(self):
-        return {'dofn_value': 42}
-
-    dofn = SpecialDoFn()
-    pardo = SpecialParDo(dofn)
-    dd = DisplayData.create_from(pardo)
-    nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
-    self.assertEqual(dd.items[0].get_dict(),
-                     {"type": "INTEGER",
-                      "namespace": nspace,
-                      "value": 42,
-                      "key": "dofn_value"})
-
-
-# TODO: Test __repr__ function
-# TODO: Test PATH when added by swegner@
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d864d968/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
new file mode 100644
index 0000000..227f3bc
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DisplayData API."""
+
+from __future__ import absolute_import
+
+from datetime import datetime
+import unittest
+
+import apache_beam as beam
+from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem
+
+
+class DisplayDataTest(unittest.TestCase):
+
+  def test_inheritance_ptransform(self):
+    class MyTransform(beam.PTransform):
+      pass
+
+    display_pt = MyTransform()
+    # PTransform inherits from HasDisplayData.
+    self.assertTrue(isinstance(display_pt, HasDisplayData))
+    self.assertEqual(display_pt.display_data(), {})
+
+  def test_inheritance_dofn(self):
+    class MyDoFn(beam.DoFn):
+      pass
+
+    display_dofn = MyDoFn()
+    self.assertTrue(isinstance(display_dofn, HasDisplayData))
+    self.assertEqual(display_dofn.display_data(), {})
+
+  def test_base_cases(self):
+    """ Tests basic display data cases (key:value, key:dict)
+    It does not test subcomponent inclusion
+    """
+    class MyDoFn(beam.DoFn):
+      def __init__(self, *args, **kwargs):
+        self.my_display_data = kwargs.get('display_data', None)
+
+      def process(self, context):
+        yield context.element + 1
+
+      def display_data(self):
+        return {'static_integer': 120,
+                'static_string': 'static me!',
+                'complex_url': DisplayDataItem('github.com',
+                                               url='http://github.com',
+                                               label='The URL'),
+                'python_class': HasDisplayData,
+                'my_dd': self.my_display_data}
+
+    now = datetime.now()
+    fn = MyDoFn(display_data=now)
+    dd = DisplayData.create_from(fn)
+    dd_dicts = sorted([item.get_dict() for item in dd.items],
+                      key=lambda x: x['namespace']+x['key'])
+
+    nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
+    expected_items = [
+        {'url': 'http://github.com', 'namespace': nspace,
+         'value': 'github.com', 'label': 'The URL',
+         'key': 'complex_url', 'type': 'STRING'},
+        {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd',
+         'value': DisplayDataItem._format_value(now, 'TIMESTAMP')},
+        {'type': 'JAVA_CLASS', 'namespace': nspace,
+         'shortValue': 'HasDisplayData', 'key': 'python_class',
+         'value': 'apache_beam.transforms.display.HasDisplayData'},
+        {'type': 'INTEGER', 'namespace': nspace,
+         'value': 120, 'key': 'static_integer'},
+        {'type': 'STRING', 'namespace': nspace,
+         'value': 'static me!', 'key': 'static_string'}]
+    expected_items = sorted(expected_items,
+                            key=lambda x: x['namespace']+x['key'])
+
+    self.assertEqual(dd_dicts, expected_items)
+
+  def test_subcomponent(self):
+    class SpecialParDo(beam.ParDo):
+      def __init__(self, fn):
+        self.fn = fn
+
+      def display_data(self):
+        return {'asubcomponent': self.fn}
+
+    class SpecialDoFn(beam.DoFn):
+      def display_data(self):
+        return {'dofn_value': 42}
+
+    dofn = SpecialDoFn()
+    pardo = SpecialParDo(dofn)
+    dd = DisplayData.create_from(pardo)
+    nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
+    self.assertEqual(dd.items[0].get_dict(),
+                     {"type": "INTEGER",
+                      "namespace": nspace,
+                      "value": 42,
+                      "key": "dofn_value"})
+
+
+# TODO: Test __repr__ function
+# TODO: Test PATH when added by swegner@
+if __name__ == '__main__':
+  unittest.main()


[3/6] incubator-beam git commit: Laying down infrastructure for static display data

Posted by ro...@apache.org.
Laying down infrastructure for static display data


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/07885c86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/07885c86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/07885c86

Branch: refs/heads/python-sdk
Commit: 07885c86c91a9fca65973489765bf3e9ba3ed461
Parents: 57f03f7
Author: Pablo <pa...@google.com>
Authored: Fri Oct 14 11:44:47 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:32 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow_runner.py      |  26 ++++
 sdks/python/apache_beam/runners/runner_test.py  |  60 +++++++++
 sdks/python/apache_beam/transforms/__init__.py  |   1 +
 sdks/python/apache_beam/transforms/core.py      |   3 +-
 .../apache_beam/transforms/display/__init__.py  |   1 +
 .../transforms/display/display_data.py          | 133 +++++++++++++++++++
 .../transforms/display/display_data_test.py     | 100 ++++++++++++++
 .../python/apache_beam/transforms/ptransform.py |   3 +-
 sdks/python/apache_beam/utils/names.py          |   1 +
 9 files changed, 326 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index f794c8b..226b460 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -35,6 +35,7 @@ from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
+from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
 from apache_beam.utils import names
 from apache_beam.utils.names import PropertyNames
@@ -286,6 +287,9 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform).output())
 
   def run_CreatePCollectionView(self, transform_node):
     step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
@@ -304,6 +308,9 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform_node.transform).output())
 
   def run_Flatten(self, transform_node):
     step = self._add_step(TransformNames.FLATTEN,
@@ -323,6 +330,9 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform_node.transform).output())
 
   def apply_GroupByKey(self, transform, pcoll):
     # Infer coder of parent.
@@ -364,6 +374,9 @@ class DataflowPipelineRunner(PipelineRunner):
     windowing = transform_node.transform.get_windowing(
         transform_node.inputs)
     step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform_node.transform).output())
 
   def run_ParDo(self, transform_node):
     transform = transform_node.transform
@@ -422,7 +435,11 @@ class DataflowPipelineRunner(PipelineRunner):
            PropertyNames.ENCODING: step.encoding,
            PropertyNames.OUTPUT_NAME: (
                '%s_%s' % (PropertyNames.OUT, side_tag))})
+
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform).output())
 
   @staticmethod
   def _pardo_fn_data(transform_node, get_label):
@@ -471,6 +488,9 @@ class DataflowPipelineRunner(PipelineRunner):
          PropertyNames.ENCODING: step.encoding,
          PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform).output())
 
   def run_Read(self, transform_node):
     transform = transform_node.transform
@@ -545,6 +565,9 @@ class DataflowPipelineRunner(PipelineRunner):
             '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform).output())
 
   def run__NativeWrite(self, transform_node):
     transform = transform_node.transform
@@ -614,6 +637,9 @@ class DataflowPipelineRunner(PipelineRunner):
         {'@type': 'OutputReference',
          PropertyNames.STEP_NAME: input_step.proto.name,
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        DisplayData.create_from(transform).output())
 
 
 class DataflowPipelineResult(PipelineResult):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 04de7fb..1f73a36 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -22,6 +22,8 @@ the other unit tests. In this file we choose to test only aspects related to
 caching and clearing values that are not tested elsewhere.
 """
 
+from datetime import datetime
+import json
 import unittest
 
 import apache_beam as beam
@@ -32,6 +34,7 @@ from apache_beam.runners import create_runner
 from apache_beam.runners import DataflowPipelineRunner
 from apache_beam.runners import DirectPipelineRunner
 import apache_beam.transforms as ptransform
+from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.options import PipelineOptions
 
 
@@ -66,6 +69,63 @@ class RunnerTest(unittest.TestCase):
     remote_runner.job = apiclient.Job(p.options)
     super(DataflowPipelineRunner, remote_runner).run(p)
 
+  def test_remote_runner_display_data(self):
+    remote_runner = DataflowPipelineRunner()
+    p = Pipeline(remote_runner,
+                 options=PipelineOptions([
+                     '--dataflow_endpoint=ignored',
+                     '--job_name=test-job',
+                     '--project=test-project',
+                     '--staging_location=ignored',
+                     '--temp_location=/dev/null',
+                     '--no_auth=True'
+                 ]))
+
+    class SpecialParDo(beam.ParDo):
+      def __init__(self, fn, now, *args, **kwargs):
+        super(SpecialParDo, self).__init__(fn, *args, **kwargs)
+        self.fn = fn
+        self.now = now
+
+      # Make this a list to be accessible within closure
+      def display_data(self):
+        return {'asubcomponent': self.fn,
+                'a_class': SpecialParDo,
+                'a_time': self.now}
+
+    class SpecialDoFn(beam.DoFn):
+      def display_data(self):
+        return {'dofn_value': 42}
+
+      def process(self, context):
+        pass
+
+    now = datetime.now()
+    # pylint: disable=expression-not-assigned
+    (p | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
+     | 'do' >> SpecialParDo(SpecialDoFn(), now))
+
+    remote_runner.job = apiclient.Job(p.options)
+    super(DataflowPipelineRunner, remote_runner).run(p)
+    job_dict = json.loads(str(remote_runner.job))
+    steps = [step
+             for step in job_dict['steps']
+             if len(step['properties'].get('display_data', [])) > 0]
+    step = steps[0]
+    disp_data = step['properties']['display_data']
+    disp_data = sorted(disp_data, key=lambda x: x['namespace']+x['key'])
+    nspace = SpecialParDo.__module__+ '.'
+    expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo',
+                      'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
+                      'key': 'a_time'},
+                     {'type': 'JAVA_CLASS', 'namespace': nspace+'SpecialParDo',
+                      'value': nspace+'SpecialParDo', 'key': 'a_class'},
+                     {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn',
+                      'value': 42, 'key': 'dofn_value'}]
+    expected_data = sorted(expected_data, key=lambda x: x['namespace']+x['key'])
+    self.assertEqual(len(disp_data), 3)
+    self.assertEqual(disp_data, expected_data)
+
   def test_no_group_by_key_directly_after_bigquery(self):
     remote_runner = DataflowPipelineRunner()
     p = Pipeline(remote_runner,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index db8e193..3cfe60b 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -24,3 +24,4 @@ from apache_beam.transforms.core import *
 from apache_beam.transforms.ptransform import *
 from apache_beam.transforms.timeutil import *
 from apache_beam.transforms.util import *
+from apache_beam.transforms.display import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index da26205..3b5816a 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -29,6 +29,7 @@ from apache_beam.coders import typecoders
 from apache_beam.internal import util
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
+from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import PTransformWithSideInputs
 from apache_beam.transforms.window import MIN_TIMESTAMP
@@ -118,7 +119,7 @@ class DoFnProcessContext(DoFnContext):
     self.state.counter_for(aggregator).update(input_value)
 
 
-class DoFn(WithTypeHints):
+class DoFn(WithTypeHints, HasDisplayData):
   """A function object used by a transform with custom processing.
 
   The ParDo transform is such a transform. The ParDo.apply

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/display/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/__init__.py b/sdks/python/apache_beam/transforms/display/__init__.py
new file mode 100644
index 0000000..c946ac3
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display/__init__.py
@@ -0,0 +1 @@
+from apache_beam.transforms.display.display_data import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/display/display_data.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data.py b/sdks/python/apache_beam/transforms/display/display_data.py
new file mode 100644
index 0000000..44909ad
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display/display_data.py
@@ -0,0 +1,133 @@
+"""
+DisplayData, its classes, interfaces and methods.
+"""
+
+from __future__ import absolute_import
+
+import calendar
+from datetime import datetime, timedelta
+import inspect
+import json
+
+__all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
+
+class HasDisplayData(object):
+  """ Basic interface for elements that contain display data.
+
+  It contains only the display_data method.
+  """
+  def __init__(self, *args, **kwargs):
+    super(HasDisplayData, self).__init__(*args, **kwargs)
+
+  def display_data(self):
+    return {}
+
+  def _namespace(self):
+    return '{}.{}'.format(self.__module__, self.__class__.__name__)
+
+
+class DisplayData(object):
+  def __init__(self, namespace='__main__'):
+    self.namespace = namespace
+    self.items = []
+
+  def populate_items(self, display_data_dict):
+    for key, element in display_data_dict.items():
+      if isinstance(element, HasDisplayData):
+        subcomponent_display_data = DisplayData(element._namespace())
+        subcomponent_display_data.populate_items(element.display_data())
+        self.items += subcomponent_display_data.items
+        continue
+
+      if isinstance(element, dict):
+        self.items.append(
+            DisplayDataItem(self.namespace,
+                            key,
+                            DisplayDataItem._get_value_type(element['value']),
+                            element['value'],
+                            shortValue=element.get('shortValue'),
+                            url=element.get('url'),
+                            label=element.get('label')))
+        continue
+
+      # If it's not a HasDisplayData element,
+      # nor a dictionary, then it's a simple value
+      self.items.append(
+          DisplayDataItem(self.namespace,
+                          key,
+                          DisplayDataItem._get_value_type(element),
+                          element))
+
+  def output(self):
+    return [item.get_dict() for item in self.items]
+
+  @classmethod
+  def create_from(cls, has_display_data):
+    if not isinstance(has_display_data, HasDisplayData):
+      raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
+                       .format(has_display_data.__module__,
+                               has_display_data.__class__.__name__))
+    display_data = cls(has_display_data._namespace())
+    display_data.populate_items(has_display_data.display_data())
+    return display_data
+
+
+class DisplayDataItem(object):
+  typeDict = {str:'STRING',
+              int:'INTEGER',
+              float:'FLOAT',
+              timedelta:'DURATION',
+              datetime:'TIMESTAMP'}
+
+  def __init__(self, namespace, key, type_, value,
+               shortValue=None, url=None, label=None):
+    if key is None:
+      raise ValueError('Key must not be None')
+    if value is None:
+      raise ValueError('Value must not be None')
+    if type_ is None:
+      raise ValueError('Value {} is of an unsupported type.'.format(value))
+
+    self.namespace = namespace
+    self.key = key
+    self.type = type_
+    self.value = value
+    self.shortValue = shortValue
+    self.url = url
+    self.label = label
+
+  def get_dict(self):
+    res = {'key': self.key,
+           'namespace': self.namespace,
+           'type': self.type}
+
+    if self.url is not None:
+      res['url'] = self.url
+    # TODO: What to do about shortValue? No special processing?
+    if self.shortValue is not None:
+      res['shortValue'] = self.shortValue
+    if self.label is not None:
+      res['label'] = self.label
+    res['value'] = self._format_value(self.value, self.type)
+    return res
+
+  def __repr__(self):
+    return 'DisplayDataItem({})'.format(json.dumps(self.get_dict()))
+
+  @classmethod
+  def _format_value(cls, value, type_):
+    res = value
+    if type_ == 'JAVA_CLASS':
+      res = '{}.{}'.format(value.__module__, value.__name__)
+    if type_ == 'DURATION':
+      res = value.total_seconds()*1000
+    if type_ == 'TIMESTAMP':
+      res = calendar.timegm(value.timetuple())*1000 + value.microsecond//1000
+    return res
+
+  @classmethod
+  def _get_value_type(cls, value):
+    type_ = cls.typeDict.get(type(value))
+    if type_ is None:
+      type_ = 'JAVA_CLASS' if inspect.isclass(value) else None
+    return type_

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/display/display_data_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display/display_data_test.py b/sdks/python/apache_beam/transforms/display/display_data_test.py
new file mode 100644
index 0000000..a107334
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/display/display_data_test.py
@@ -0,0 +1,100 @@
+from __future__ import absolute_import
+
+from datetime import datetime
+import unittest
+
+import apache_beam as beam
+from apache_beam.transforms.display import HasDisplayData, DisplayData, DisplayDataItem
+
+
+class DisplayDataTest(unittest.TestCase):
+
+  def test_inheritance_ptransform(self):
+    class MyTransform(beam.PTransform):
+      pass
+
+    display_pt = MyTransform()
+    # PTransform inherits from HasDisplayData.
+    self.assertTrue(isinstance(display_pt, HasDisplayData))
+    self.assertEqual(display_pt.display_data(), {})
+
+  def test_inheritance_dofn(self):
+    class MyDoFn(beam.DoFn):
+      pass
+
+    display_dofn = MyDoFn()
+    self.assertTrue(isinstance(display_dofn, HasDisplayData))
+    self.assertEqual(display_dofn.display_data(), {})
+
+  def test_base_cases(self):
+    """ Tests basic display data cases (key:value, key:dict)
+    It does not test subcomponent inclusion
+    """
+    class MyDoFn(beam.DoFn):
+      def __init__(self, *args, **kwargs):
+        self.my_display_data = kwargs.get('display_data', None)
+
+      def process(self, context):
+        yield context.element + 1
+
+      def display_data(self):
+        return {'static_integer': 120,
+                'static_string': 'static me!',
+                'complex_url': {'value': 'github.com',
+                                'url': 'http://github.com',
+                                'label': 'The URL'},
+                'python_class': HasDisplayData,
+                'my_dd': self.my_display_data}
+
+    now = datetime.now()
+    fn = MyDoFn(display_data=now)
+    dd = DisplayData.create_from(fn)
+    dd_dicts = sorted([item.get_dict() for item in dd.items],
+                      key=lambda x: x['namespace']+x['key'])
+
+    nspace = '{}.{}'.format(fn.__module__, fn.__class__.__name__)
+    expected_items = [
+        {'url': 'http://github.com', 'namespace': nspace,
+         'value': 'github.com', 'label': 'The URL',
+         'key': 'complex_url', 'type': 'STRING'},
+        {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd',
+         'value': DisplayDataItem._format_value(now, 'TIMESTAMP')},
+        {'type': 'JAVA_CLASS', 'namespace': nspace,
+         'value': 'apache_beam.transforms.display.display_data.HasDisplayData',
+         'key': 'python_class'},
+        {'type': 'INTEGER', 'namespace': nspace,
+         'value': 120, 'key': 'static_integer'},
+        {'type': 'STRING', 'namespace': nspace,
+         'value': 'static me!', 'key': 'static_string'}]
+    expected_items = sorted(expected_items,
+                            key=lambda x: x['namespace']+x['key'])
+
+    self.assertEqual(dd_dicts, expected_items)
+
+  def test_subcomponent(self):
+    class SpecialParDo(beam.ParDo):
+      def __init__(self, fn):
+        self.fn = fn
+
+      def display_data(self):
+        return {'asubcomponent': self.fn}
+
+    class SpecialDoFn(beam.DoFn):
+      def display_data(self):
+        return {'dofn_value': 42}
+
+    dofn = SpecialDoFn()
+    pardo = SpecialParDo(dofn)
+    dd = DisplayData.create_from(pardo)
+    nspace = '{}.{}'.format(dofn.__module__, dofn.__class__.__name__)
+    self.assertEqual(dd.items[0].get_dict(),
+                     {"type": "INTEGER",
+                      "namespace": nspace,
+                      "value": 42,
+                      "key": "dofn_value"})
+
+
+# TODO: Test __repr__ function
+# TODO: Test PATH when added by swegner@
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 0713c59..0885f55 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -47,6 +47,7 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.internal import pickler
 from apache_beam.internal import util
+from apache_beam.transforms.display import HasDisplayData
 from apache_beam.typehints import getcallargs_forhints
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import validate_composite_type_param
@@ -167,7 +168,7 @@ class ZipPValues(_PValueishTransform):
         self.visit(p, sibling, pairs, context)
 
 
-class PTransform(WithTypeHints):
+class PTransform(WithTypeHints, HasDisplayData):
   """A transform object used to modify one or more PCollections.
 
   Subclasses must define an apply() method that will be used when the transform

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/07885c86/sdks/python/apache_beam/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py
index 49f0402..41b5b43 100644
--- a/sdks/python/apache_beam/utils/names.py
+++ b/sdks/python/apache_beam/utils/names.py
@@ -49,6 +49,7 @@ class PropertyNames(object):
   BIGQUERY_PROJECT = 'project'
   BIGQUERY_SCHEMA = 'schema'
   BIGQUERY_WRITE_DISPOSITION = 'write_disposition'
+  DISPLAY_DATA = 'display_data'
   ELEMENT = 'element'
   ELEMENTS = 'elements'
   ENCODING = 'encoding'


[2/6] incubator-beam git commit: Adding documentation. Setting Python classes as STRING types.

Posted by ro...@apache.org.
Adding documentation. Setting Python classes as STRING types.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfcff012
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfcff012
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfcff012

Branch: refs/heads/python-sdk
Commit: bfcff012694b3049b895f1c9464ce9324c3758d1
Parents: d864d96
Author: Pablo <pa...@google.com>
Authored: Tue Oct 18 14:49:33 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Oct 27 10:39:32 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner_test.py  |   2 +-
 sdks/python/apache_beam/transforms/display.py   | 113 ++++++++++++++++++-
 .../apache_beam/transforms/display_test.py      |   2 +-
 3 files changed, 109 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfcff012/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 19160c3..891f9dc 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -111,7 +111,7 @@ class RunnerTest(unittest.TestCase):
     expected_data = [{'type': 'TIMESTAMP', 'namespace': nspace+'SpecialParDo',
                       'value': DisplayDataItem._format_value(now, 'TIMESTAMP'),
                       'key': 'a_time'},
-                     {'type': 'JAVA_CLASS', 'namespace': nspace+'SpecialParDo',
+                     {'type': 'STRING', 'namespace': nspace+'SpecialParDo',
                       'value': nspace+'SpecialParDo', 'key': 'a_class',
                       'shortValue': 'SpecialParDo'},
                      {'type': 'INTEGER', 'namespace': nspace+'SpecialDoFn',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfcff012/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index 87d3046..372cbf5 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -17,6 +17,19 @@
 
 """
 DisplayData, its classes, interfaces and methods.
+
+The classes in this module allow users and transform developers to define
+static display data to be displayed when a pipeline runs. PTransforms, DoFns
+and other pipeline components are subclasses of the HasDisplayData mixin. To
+add static display data to a component, you can override the display_data
+method of the HasDisplayData class.
+
+Available classes:
+- HasDisplayData - Components that inherit from this class can have static
+    display data shown in the UI.
+- DisplayDataItem - This class represents static display data elements.
+- DisplayData - Internal class that is used to create display data and
+    communicate it to the API.
 """
 
 from __future__ import absolute_import
@@ -30,11 +43,30 @@ __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
 
 
 class HasDisplayData(object):
-  """ Basic interface for elements that contain display data.
+  """ Basic mixin for elements that contain display data.
 
-  It contains only the display_data method and a namespace method.
+  It implements only the display_data method and a _namespace method.
   """
+
   def display_data(self):
+    """ Returns the display data associated to a pipeline component.
+
+    It should be reimplemented in pipeline components that wish to have
+    static display data.
+
+    Returns:
+      A dictionary containing key:value pairs. The value might be an
+      integer, float or string value; a DisplayDataItem for values that
+      have more data (e.g. short value, label, url); or a HasDisplayData
+      instance that has more display data that should be picked up. For
+      example:
+
+      { 'key1': 'string_value',
+        'key2': 1234,
+        'key3': 3.14159265,
+        'key4': DisplayDataItem('apache.org', url='http://apache.org'),
+        'key5': subComponent }
+    """
     return {}
 
   def _namespace(self):
@@ -42,12 +74,17 @@ class HasDisplayData(object):
 
 
 class DisplayData(object):
+  """ Static display data associated with a pipeline component.
+  """
+
   def __init__(self, namespace, display_data_dict):
     self.namespace = namespace
     self.items = []
     self.populate_items(display_data_dict)
 
   def populate_items(self, display_data_dict):
+    """ Populates the list of display data items.
+    """
     for key, element in display_data_dict.items():
       if isinstance(element, HasDisplayData):
         subcomponent_display_data = DisplayData(element._namespace(),
@@ -69,10 +106,21 @@ class DisplayData(object):
                           key=key))
 
   def output(self):
+    """ Returns the JSON-API list of display data items to send to the runner.
+    """
     return [item.get_dict() for item in self.items]
 
   @classmethod
   def create_from(cls, has_display_data):
+    """ Creates DisplayData from a HasDisplayData instance.
+
+    Returns:
+      A DisplayData instance with populated items.
+
+    Raises:
+      ValueError: If the has_display_data argument is not an instance of
+        HasDisplayData.
+    """
     if not isinstance(has_display_data, HasDisplayData):
       raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
                        .format(has_display_data.__module__,
@@ -81,6 +129,11 @@ class DisplayData(object):
 
 
 class DisplayDataItem(object):
+  """ A DisplayDataItem represents a unit of static display data.
+
+  Each item is identified by a key and the namespace of the component the
+  display item belongs to.
+  """
   typeDict = {str:'STRING',
               int:'INTEGER',
               float:'FLOAT',
@@ -99,6 +152,12 @@ class DisplayDataItem(object):
     self.label = label
 
   def is_valid(self):
+    """ Checks that all the necessary fields of the DisplayDataItem are
+    filled in. It checks that neither key, namespace, value or type are None.
+
+    Raises:
+      ValueError: If the item does not have a key, namespace, value or type.
+    """
     if self.key is None:
       raise ValueError('Key must not be None')
     if self.namespace is None:
@@ -109,11 +168,22 @@ class DisplayDataItem(object):
       raise ValueError('Value {} is of an unsupported type.'.format(self.value))
 
   def get_dict(self):
+    """ Returns the internal-API dictionary representing the DisplayDataItem.
+
+    Returns:
+      A dictionary. The internal-API dictionary representing the
+      DisplayDataItem
+
+    Raises:
+     ValueError: if the item is not valid.
+    """
     self.is_valid()
 
     res = {'key': self.key,
            'namespace': self.namespace,
-           'type': self.type}
+           'type': self.type if self.type != 'CLASS' else 'STRING'}
+    # TODO: Python Class types should not be special-cased once
+    # the Fn API is in.
 
     if self.url is not None:
       res['url'] = self.url
@@ -129,8 +199,17 @@ class DisplayDataItem(object):
 
   @classmethod
   def _format_value(cls, value, type_):
+    """ Returns the API representation of a value given its type.
+
+    Args:
+      value: The value of the item that needs to be shortened.
+      type_(string): The type of the value.
+
+    Returns:
+      A formatted value in the form of a float, int, or string.
+    """
     res = value
-    if type_ == 'JAVA_CLASS':
+    if type_ == 'CLASS':
       res = '{}.{}'.format(value.__module__, value.__name__)
     if type_ == 'DURATION':
       res = value.total_seconds()*1000
@@ -140,13 +219,35 @@ class DisplayDataItem(object):
 
   @classmethod
   def _get_short_value(cls, value, type_):
-    if type_ == 'JAVA_CLASS':
+    """ Calculates the short value for an item.
+
+    Args:
+      value: The value of the item that needs to be shortened.
+      type_(string): The type of the value.
+
+    Returns:
+      The unqualified name of a class if type_ is 'CLASS'. None otherwise.
+    """
+    if type_ == 'CLASS':
       return value.__name__
     return None
 
   @classmethod
   def _get_value_type(cls, value):
+    """ Infers the type of a given value.
+
+    Args:
+      value: The value whose type needs to be inferred. For 'DURATION' and
+        'TIMESTAMP', the corresponding Python type is datetime.timedelta and
+        datetime.datetime respectively. For Python classes, the API type is
+        just 'STRING' at the moment.
+
+    Returns:
+      One of 'STRING', 'INTEGER', 'FLOAT', 'CLASS', 'DURATION', or
+      'TIMESTAMP', depending on the type of the value.
+    """
+    #TODO: Fix Args: documentation once the Python classes handling has changed
     type_ = cls.typeDict.get(type(value))
     if type_ is None:
-      type_ = 'JAVA_CLASS' if inspect.isclass(value) else None
+      type_ = 'CLASS' if inspect.isclass(value) else None
     return type_

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bfcff012/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
index 227f3bc..906bb8f 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -78,7 +78,7 @@ class DisplayDataTest(unittest.TestCase):
          'key': 'complex_url', 'type': 'STRING'},
         {'type': 'TIMESTAMP', 'namespace': nspace, 'key': 'my_dd',
          'value': DisplayDataItem._format_value(now, 'TIMESTAMP')},
-        {'type': 'JAVA_CLASS', 'namespace': nspace,
+        {'type': 'STRING', 'namespace': nspace,
          'shortValue': 'HasDisplayData', 'key': 'python_class',
          'value': 'apache_beam.transforms.display.HasDisplayData'},
         {'type': 'INTEGER', 'namespace': nspace,