You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/08 01:55:29 UTC
[2/3] incubator-beam git commit: Implement key range tracker.
Implement key range tracker.
This will be useful for Bigtable and other key-based sources.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54545c43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54545c43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54545c43
Branch: refs/heads/python-sdk
Commit: 54545c4339f4710adc997895159106e6af570e0f
Parents: 4768227
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Nov 4 11:37:42 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Nov 7 17:53:12 2016 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/io/range_trackers.py | 102 +++++++++++++++-
.../apache_beam/io/range_trackers_test.py | 121 ++++++++++++++++++-
2 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54545c43/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index b42ff1b..4c8f7eb 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -290,8 +290,8 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
"""
An abstract base class for range trackers whose positions are comparable.
- Subclasses only need to implement the mapping from position ranges to and from the
- closed interval [0, 1].
+ Subclasses only need to implement the mapping from position ranges
+ to and from the closed interval [0, 1].
"""
UNSTARTED = object()
@@ -322,23 +322,27 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
if self._stop_position is None or position < self._stop_position:
self._last_claim = position
return True
+ else:
+ return False
def position_at_fraction(self, fraction):
return self.fraction_to_position(
- fraction, self._start_position, self._stop_position)
+ fraction, self._start_position, self._stop_position)
def try_split(self, position):
with self._lock:
- if ((self._stop_position is not None and position > self._stop_position)
+ if ((self._stop_position is not None and position >= self._stop_position)
or (self._start_position is not None
and position <= self._start_position)):
raise ValueError("Split at '%s' not in range %s" % (
position, [self._start_position, self._stop_position]))
if self._last_claim is self.UNSTARTED or self._last_claim < position:
fraction = self.position_to_fraction(
- position, start=self._start_position, end=self._stop_position)
+ position, start=self._start_position, end=self._stop_position)
self._stop_position = position
return position, fraction
+ else:
+ return None
def fraction_consumed(self):
if self._last_claim is self.UNSTARTED:
@@ -399,3 +403,91 @@ class UnsplittableRangeTracker(iobase.RangeTracker):
def fraction_consumed(self):
return self._range_tracker.fraction_consumed()
+
+
+class LexicographicKeyRangeTracker(OrderedPositionRangeTracker):
+ """
+ A range tracker that tracks progress through a lexicographically
+ ordered keyspace of strings.
+ """
+
+ @classmethod
+ def fraction_to_position(cls, fraction, start=None, end=None):
+ """
+ Linearly interpolates a key that is lexicographically
+ fraction of the way between start and end.
+ """
+ assert 0 <= fraction <= 1, fraction
+ if start is None:
+ start = ''
+ if fraction == 1:
+ return end
+ elif fraction == 0:
+ return start
+ else:
+ if not end:
+ common_prefix_len = len(start) - len(start.lstrip('\xFF'))
+ else:
+ for ix, (s, e) in enumerate(zip(start, end)):
+ if s != e:
+ common_prefix_len = ix
+ break
+ else:
+ common_prefix_len = min(len(start), len(end))
+ # Convert the relative precision of fraction (~53 bits) to an absolute
+ # precision needed to represent values between start and end distinctly.
+ prec = common_prefix_len + int(-math.log(fraction, 256)) + 7
+ istart = cls._string_to_int(start, prec)
+ iend = cls._string_to_int(end, prec) if end else 1 << (prec * 8)
+ ikey = istart + int((iend - istart) * fraction)
+ # Could be equal due to rounding.
+ # Adjust to ensure we never return the actual start and end
+ # unless fraction is exatly 0 or 1.
+ if ikey == istart:
+ ikey += 1
+ elif ikey == iend:
+ ikey -= 1
+ return cls._string_from_int(ikey, prec).rstrip('\0')
+
+ @classmethod
+ def position_to_fraction(cls, key, start=None, end=None):
+ """
+ Returns the fraction of keys in the range [start, end) that
+ are less than the given key.
+ """
+ if not key:
+ return 0
+ if start is None:
+ start = ''
+ prec = len(start) + 7
+ if key.startswith(start):
+ # Higher absolute precision needed for very small values of fixed
+ # relative position.
+ prec = max(prec, len(key) - len(key[len(start):].strip('\0')) + 7)
+ istart = cls._string_to_int(start, prec)
+ ikey = cls._string_to_int(key, prec)
+ iend = cls._string_to_int(end, prec) if end else 1 << (prec * 8)
+ return float(ikey - istart) / (iend - istart)
+
+ @staticmethod
+ def _string_to_int(s, prec):
+ """
+ Returns int(256**prec * f) where f is the fraction
+ represented by interpreting '.' + s as a base-256
+ floating point number.
+ """
+ if not s:
+ return 0
+ elif len(s) < prec:
+ s += '\0' * (prec - len(s))
+ else:
+ s = s[:prec]
+ return int(s.encode('hex'), 16)
+
+ @staticmethod
+ def _string_from_int(i, prec):
+ """
+ Inverse of _string_to_int.
+ """
+ h = '%x' % i
+ return ('0' * (2 * prec - len(h)) + h).decode('hex')
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54545c43/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py
index 161103c..b80d1f3 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -20,6 +20,7 @@
import array
import copy
import logging
+import math
import unittest
@@ -362,7 +363,8 @@ class OrderedPositionRangeTrackerTest(unittest.TestCase):
self.assertTrue(tracker.try_claim(17))
# but can't split before claimed 17,
self.assertIsNone(tracker.try_split(16))
- # nor claim anything after 18.
+ # nor claim anything at or after 18.
+ self.assertFalse(tracker.try_claim(18))
self.assertFalse(tracker.try_claim(19))
def test_claim_order(self):
@@ -380,7 +382,7 @@ class OrderedPositionRangeTrackerTest(unittest.TestCase):
# Can't split before range.
with self.assertRaises(ValueError):
tracker.try_split(-5)
- # Can't split at start position.
+ # Reject useless split at start position.
with self.assertRaises(ValueError):
tracker.try_split(10)
# Can't split after range.
@@ -390,6 +392,10 @@ class OrderedPositionRangeTrackerTest(unittest.TestCase):
# Can't split after modified range.
with self.assertRaises(ValueError):
tracker.try_split(17)
+ # Reject useless split at end position.
+ with self.assertRaises(ValueError):
+ tracker.try_split(15)
+ self.assertTrue(tracker.try_split(14))
class UnsplittableRangeTrackerTest(unittest.TestCase):
@@ -416,6 +422,117 @@ class UnsplittableRangeTrackerTest(unittest.TestCase):
self.assertFalse(copy.copy(tracker).try_split(199))
+class LexicographicKeyRangeTrackerTest(unittest.TestCase):
+ """
+ Tests of LexicographicKeyRangeTracker.
+ """
+
+ key_to_fraction = (
+ range_trackers.LexicographicKeyRangeTracker.position_to_fraction)
+ fraction_to_key = (
+ range_trackers.LexicographicKeyRangeTracker.fraction_to_position)
+
+ def _check(self, fraction=None, key=None, start=None, end=None, delta=0):
+ assert key is not None or fraction is not None
+ if fraction is None:
+ fraction = self.key_to_fraction(key, start, end)
+ elif key is None:
+ key = self.fraction_to_key(fraction, start, end)
+
+ if key is None and end is None and fraction == 1:
+ # No way to distinguish from fraction == 0.
+ computed_fraction = 1
+ else:
+ computed_fraction = self.key_to_fraction(key, start, end)
+ computed_key = self.fraction_to_key(fraction, start, end)
+
+ if delta:
+ self.assertAlmostEqual(computed_fraction, fraction,
+ delta=delta, places=None, msg=str(locals()))
+ else:
+ self.assertEqual(computed_fraction, fraction, str(locals()))
+ self.assertEqual(computed_key, key, str(locals()))
+
+ def test_key_to_fraction_no_endpoints(self):
+ self._check(key='\x07', fraction=7/256.)
+ self._check(key='\xFF', fraction=255/256.)
+ self._check(key='\x01\x02\x03', fraction=(2**16 + 2**9 + 3) / (2.0**24))
+
+ def test_key_to_fraction(self):
+ self._check(key='\x87', start='\x80', fraction=7/128.)
+ self._check(key='\x07', end='\x10', fraction=7/16.)
+ self._check(key='\x47', start='\x40', end='\x80', fraction=7/64.)
+ self._check(key='\x47\x80', start='\x40', end='\x80', fraction=15/128.)
+
+ def test_key_to_fraction_common_prefix(self):
+ self._check(
+ key='a' * 100 + 'b', start='a' * 100 + 'a', end='a' * 100 + 'c',
+ fraction=0.5)
+ self._check(
+ key='a' * 100 + 'b', start='a' * 100 + 'a', end='a' * 100 + 'e',
+ fraction=0.25)
+ self._check(
+ key='\xFF' * 100 + '\x40', start='\xFF' * 100, end=None, fraction=0.25)
+ self._check(key='foob',
+ start='fooa\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE',
+ end='foob\x00\x00\x00\x00\x00\x00\x00\x00\x02',
+ fraction=0.5)
+
+ def test_tiny(self):
+ self._check(fraction=.5**20, key='\0\0\x10')
+ self._check(fraction=.5**20, start='a', end='b', key='a\0\0\x10')
+ self._check(fraction=.5**20, start='a', end='c', key='a\0\0\x20')
+ self._check(fraction=.5**20, start='xy_a', end='xy_c', key='xy_a\0\0\x20')
+ self._check(fraction=.5**20, start='\xFF\xFF\x80',
+ key='\xFF\xFF\x80\x00\x08')
+ self._check(fraction=.5**20 / 3,
+ start='xy_a',
+ end='xy_c',
+ key='xy_a\x00\x00\n\xaa\xaa\xaa\xaa\xaa',
+ delta=1e-15)
+ self._check(fraction=.5**100, key='\0' * 12 + '\x10')
+
+ def test_lots(self):
+ for fraction in (0, 1, .5, .75, 7./512, 1 - 7./4096):
+ self._check(fraction)
+ self._check(fraction, start='\x01')
+ self._check(fraction, end='\xF0')
+ self._check(fraction, start='0x75', end='\x76')
+ self._check(fraction, start='0x75', end='\x77')
+ self._check(fraction, start='0x75', end='\x78')
+ self._check(fraction, start='a' * 100 + '\x80', end='a' * 100 + '\x81')
+ self._check(fraction, start='a' * 101 + '\x80', end='a' * 101 + '\x81')
+ self._check(fraction, start='a' * 102 + '\x80', end='a' * 102 + '\x81')
+ for fraction in (.3, 1/3., 1/math.e, .001, 1e-30, .99, .999999):
+ self._check(fraction, delta=1e-14)
+ self._check(fraction, start='\x01', delta=1e-14)
+ self._check(fraction, end='\xF0', delta=1e-14)
+ self._check(fraction, start='0x75', end='\x76', delta=1e-14)
+ self._check(fraction, start='0x75', end='\x77', delta=1e-14)
+ self._check(fraction, start='0x75', end='\x78', delta=1e-14)
+ self._check(fraction, start='a' * 100 + '\x80', end='a' * 100 + '\x81',
+ delta=1e-14)
+
+ def test_good_prec(self):
+ # There should be about 7 characters (~53 bits) of precision
+ # (beyond the common prefix of start and end).
+ self._check(1 / math.e, start='abc_abc', end='abc_xyz',
+ key='abc_i\xe0\xf4\x84\x86\x99\x96',
+ delta=1e-15)
+ # This remains true even if the start and end keys are given to
+ # high precision.
+ self._check(1 / math.e,
+ start='abcd_abc\0\0\0\0\0_______________abc',
+ end='abcd_xyz\0\0\0\0\0\0_______________abc',
+ key='abcd_i\xe0\xf4\x84\x86\x99\x96',
+ delta=1e-15)
+ # For very small fractions, however, higher precision is used to
+ # accurately represent small increments in the keyspace.
+ self._check(1e-20 / math.e, start='abcd_abc', end='abcd_xyz',
+ key='abcd_abc\x00\x00\x00\x00\x00\x01\x91#\x172N\xbb',
+ delta=1e-35)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()