You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/01 05:14:46 UTC
[2/3] incubator-beam git commit: Remove fake Timestamp and
WindowedValue hacks.
Remove fake Timestamp and WindowedValue hacks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/066b7c68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/066b7c68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/066b7c68
Branch: refs/heads/python-sdk
Commit: 066b7c684d876666e47301271771dca69662945b
Parents: 8fd651f
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Mon Oct 31 17:26:52 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Mon Oct 31 22:14:18 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/coders/coder_impl.py | 17 +++--------------
sdks/python/apache_beam/coders/coders.py | 5 +----
.../apache_beam/coders/coders_test_common.py | 11 ++++++-----
sdks/python/apache_beam/transforms/timeutil.py | 8 +++++---
sdks/python/apache_beam/utils/timestamp.py | 4 ----
5 files changed, 15 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index c73dd31..1ff3a44 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -25,22 +25,14 @@ This module may be optionally compiled with Cython, using the corresponding
coder_impl.pxd file for type hints.
"""
-import collections
from types import NoneType
from apache_beam.coders import observable
-
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import WindowedValue
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
- # Don't depend on the full dataflow sdk to test coders.
- from apache_beam.transforms.window import WindowedValue
-except ImportError:
- WindowedValue = collections.namedtuple(
- 'WindowedValue', ('value', 'timestamp', 'windows'))
-
-
-try:
from stream import InputStream as create_InputStream
from stream import OutputStream as create_OutputStream
from stream import ByteCountingOutputStream
@@ -326,14 +318,11 @@ class FloatCoderImpl(StreamCoderImpl):
class TimestampCoderImpl(StreamCoderImpl):
- def __init__(self, timestamp_class):
- self.timestamp_class = timestamp_class
-
def encode_to_stream(self, value, out, nested):
out.write_bigendian_int64(value.micros)
def decode_from_stream(self, in_stream, nested):
- return self.timestamp_class(micros=in_stream.read_bigendian_int64())
+ return Timestamp(micros=in_stream.read_bigendian_int64())
def estimate_size(self, unused_value, nested=False):
# A Timestamp is encoded as a 64-bit integer in 8 bytes, regardless of
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index da06db1..fbbc325 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -18,7 +18,6 @@
"""Collection of useful coders."""
import base64
-import collections
import cPickle as pickle
from apache_beam.coders import coder_impl
@@ -30,12 +29,10 @@ try:
# Import dill from the pickler module to make sure our monkey-patching of dill
# occurs.
from apache_beam.internal.pickler import dill
- from apache_beam.transforms.timeutil import Timestamp
except ImportError:
# We fall back to using the stock dill library in tests that don't use the
# full Python SDK.
import dill
- Timestamp = collections.namedtuple('Timestamp', 'micros')
def serialize_coder(coder):
@@ -265,7 +262,7 @@ class TimestampCoder(FastCoder):
"""A coder used for timeutil.Timestamp values."""
def _create_impl(self):
- return coder_impl.TimestampCoderImpl(Timestamp)
+ return coder_impl.TimestampCoderImpl()
def is_deterministic(self):
return True
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index a906437..e7780e4 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -25,6 +25,7 @@ import dill
import coders
import observable
+from apache_beam.utils import timestamp
# Defined out of line for picklability.
@@ -160,13 +161,13 @@ class CodersTest(unittest.TestCase):
def test_timestamp_coder(self):
self.check_coder(coders.TimestampCoder(),
- *[coders.Timestamp(micros=x) for x in range(-100, 100)])
+ *[timestamp.Timestamp(micros=x) for x in range(-100, 100)])
self.check_coder(coders.TimestampCoder(),
- coders.Timestamp(micros=-1234567890),
- coders.Timestamp(micros=1234567890))
+ timestamp.Timestamp(micros=-1234567890),
+ timestamp.Timestamp(micros=1234567890))
self.check_coder(coders.TimestampCoder(),
- coders.Timestamp(micros=-1234567890123456789),
- coders.Timestamp(micros=1234567890123456789))
+ timestamp.Timestamp(micros=-1234567890123456789),
+ timestamp.Timestamp(micros=1234567890123456789))
def test_tuple_coder(self):
self.check_coder(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/transforms/timeutil.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 749cda1..5453b20 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -22,14 +22,16 @@ from __future__ import absolute_import
from abc import ABCMeta
from abc import abstractmethod
-import datetime
-
-# For backwards compatibility
+# For backwards compatibility.
+# TODO(robertwb): Remove.
+# pylint: disable=unused-import
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Timestamp
+# pylint: enable=unused-import
+
class TimeDomain(object):
"""Time domain for streaming timers."""
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/066b7c68/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index 1a019c3..cfabb77 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -19,9 +19,6 @@
from __future__ import absolute_import
-from abc import ABCMeta
-from abc import abstractmethod
-
import datetime
@@ -214,4 +211,3 @@ class Duration(object):
def __mod__(self, other):
other = Duration.of(other)
return Duration(micros=self.micros % other.micros)
-