You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/01 05:14:46 UTC

[2/3] incubator-beam git commit: Remove fake Timestamp and WindowedValue hacks.

Remove fake Timestamp and WindowedValue hacks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/066b7c68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/066b7c68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/066b7c68

Branch: refs/heads/python-sdk
Commit: 066b7c684d876666e47301271771dca69662945b
Parents: 8fd651f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Oct 31 17:26:52 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 22:14:18 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py       | 17 +++--------------
 sdks/python/apache_beam/coders/coders.py           |  5 +----
 .../apache_beam/coders/coders_test_common.py       | 11 ++++++-----
 sdks/python/apache_beam/transforms/timeutil.py     |  8 +++++---
 sdks/python/apache_beam/utils/timestamp.py         |  4 ----
 5 files changed, 15 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index c73dd31..1ff3a44 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -25,22 +25,14 @@ This module may be optionally compiled with Cython, using the corresponding
 coder_impl.pxd file for type hints.
 """
 
-import collections
 from types import NoneType
 
 from apache_beam.coders import observable
-
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import WindowedValue
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
-  # Don't depend on the full dataflow sdk to test coders.
-  from apache_beam.transforms.window import WindowedValue
-except ImportError:
-  WindowedValue = collections.namedtuple(
-      'WindowedValue', ('value', 'timestamp', 'windows'))
-
-
-try:
   from stream import InputStream as create_InputStream
   from stream import OutputStream as create_OutputStream
   from stream import ByteCountingOutputStream
@@ -326,14 +318,11 @@ class FloatCoderImpl(StreamCoderImpl):
 
 class TimestampCoderImpl(StreamCoderImpl):
 
-  def __init__(self, timestamp_class):
-    self.timestamp_class = timestamp_class
-
   def encode_to_stream(self, value, out, nested):
     out.write_bigendian_int64(value.micros)
 
   def decode_from_stream(self, in_stream, nested):
-    return self.timestamp_class(micros=in_stream.read_bigendian_int64())
+    return Timestamp(micros=in_stream.read_bigendian_int64())
 
   def estimate_size(self, unused_value, nested=False):
     # A Timestamp is encoded as a 64-bit integer in 8 bytes, regardless of

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index da06db1..fbbc325 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -18,7 +18,6 @@
 """Collection of useful coders."""
 
 import base64
-import collections
 import cPickle as pickle
 
 from apache_beam.coders import coder_impl
@@ -30,12 +29,10 @@ try:
   # Import dill from the pickler module to make sure our monkey-patching of dill
   # occurs.
   from apache_beam.internal.pickler import dill
-  from apache_beam.transforms.timeutil import Timestamp
 except ImportError:
   # We fall back to using the stock dill library in tests that don't use the
   # full Python SDK.
   import dill
-  Timestamp = collections.namedtuple('Timestamp', 'micros')
 
 
 def serialize_coder(coder):
@@ -265,7 +262,7 @@ class TimestampCoder(FastCoder):
   """A coder used for timeutil.Timestamp values."""
 
   def _create_impl(self):
-    return coder_impl.TimestampCoderImpl(Timestamp)
+    return coder_impl.TimestampCoderImpl()
 
   def is_deterministic(self):
     return True

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index a906437..e7780e4 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -25,6 +25,7 @@ import dill
 
 import coders
 import observable
+from apache_beam.utils import timestamp
 
 
 # Defined out of line for picklability.
@@ -160,13 +161,13 @@ class CodersTest(unittest.TestCase):
 
   def test_timestamp_coder(self):
     self.check_coder(coders.TimestampCoder(),
-                     *[coders.Timestamp(micros=x) for x in range(-100, 100)])
+                     *[timestamp.Timestamp(micros=x) for x in range(-100, 100)])
     self.check_coder(coders.TimestampCoder(),
-                     coders.Timestamp(micros=-1234567890),
-                     coders.Timestamp(micros=1234567890))
+                     timestamp.Timestamp(micros=-1234567890),
+                     timestamp.Timestamp(micros=1234567890))
     self.check_coder(coders.TimestampCoder(),
-                     coders.Timestamp(micros=-1234567890123456789),
-                     coders.Timestamp(micros=1234567890123456789))
+                     timestamp.Timestamp(micros=-1234567890123456789),
+                     timestamp.Timestamp(micros=1234567890123456789))
 
   def test_tuple_coder(self):
     self.check_coder(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 749cda1..5453b20 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -22,14 +22,16 @@ from __future__ import absolute_import
 from abc import ABCMeta
 from abc import abstractmethod
 
-import datetime
 
-
-# For backwards compatibility
+# For backwards compatibility.
+# TODO(robertwb): Remove.
+# pylint: disable=unused-import
 from apache_beam.utils.timestamp import Duration
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.timestamp import Timestamp
+# pylint: enable=unused-import
+
 
 class TimeDomain(object):
   """Time domain for streaming timers."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index 1a019c3..cfabb77 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -19,9 +19,6 @@
 
 from __future__ import absolute_import
 
-from abc import ABCMeta
-from abc import abstractmethod
-
 import datetime
 
 
@@ -214,4 +211,3 @@ class Duration(object):
   def __mod__(self, other):
     other = Duration.of(other)
     return Duration(micros=self.micros % other.micros)
-