You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/08 01:55:29 UTC

[2/3] incubator-beam git commit: Implement key range tracker.

Implement key range tracker.

This will be useful for Bigtable and other key-based sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54545c43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54545c43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54545c43

Branch: refs/heads/python-sdk
Commit: 54545c4339f4710adc997895159106e6af570e0f
Parents: 4768227
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Nov 4 11:37:42 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Nov 7 17:53:12 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/range_trackers.py    | 102 +++++++++++++++-
 .../apache_beam/io/range_trackers_test.py       | 121 ++++++++++++++++++-
 2 files changed, 216 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54545c43/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index b42ff1b..4c8f7eb 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -290,8 +290,8 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
   """
   An abstract base class for range trackers whose positions are comparable.
 
-  Subclasses only need to implement the mapping from position ranges to and from the
-  closed interval [0, 1].
+  Subclasses only need to implement the mapping from position ranges
+  to and from the closed interval [0, 1].
   """
 
   UNSTARTED = object()
@@ -322,23 +322,27 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
       if self._stop_position is None or position < self._stop_position:
         self._last_claim = position
         return True
+      else:
+        return False
 
   def position_at_fraction(self, fraction):
     return self.fraction_to_position(
-      fraction, self._start_position, self._stop_position)
+        fraction, self._start_position, self._stop_position)
 
   def try_split(self, position):
     with self._lock:
-      if ((self._stop_position is not None and position > self._stop_position)
+      if ((self._stop_position is not None and position >= self._stop_position)
           or (self._start_position is not None
               and position <= self._start_position)):
         raise ValueError("Split at '%s' not in range %s" % (
             position, [self._start_position, self._stop_position]))
       if self._last_claim is self.UNSTARTED or self._last_claim < position:
         fraction = self.position_to_fraction(
-          position, start=self._start_position, end=self._stop_position)
+            position, start=self._start_position, end=self._stop_position)
         self._stop_position = position
         return position, fraction
+      else:
+        return None
 
   def fraction_consumed(self):
     if self._last_claim is self.UNSTARTED:
@@ -399,3 +403,91 @@ class UnsplittableRangeTracker(iobase.RangeTracker):
 
   def fraction_consumed(self):
     return self._range_tracker.fraction_consumed()
+
+
+class LexicographicKeyRangeTracker(OrderedPositionRangeTracker):
+  """
+  A range tracker that tracks progress through a lexicographically
+  ordered keyspace of strings.
+  """
+
+  @classmethod
+  def fraction_to_position(cls, fraction, start=None, end=None):
+    """
+    Linearly interpolates a key that is lexicographically
+    fraction of the way between start and end.
+    """
+    assert 0 <= fraction <= 1, fraction
+    if start is None:
+      start = ''
+    if fraction == 1:
+      return end
+    elif fraction == 0:
+      return start
+    else:
+      if not end:
+        common_prefix_len = len(start) - len(start.lstrip('\xFF'))
+      else:
+        for ix, (s, e) in enumerate(zip(start, end)):
+          if s != e:
+            common_prefix_len = ix
+            break
+        else:
+          common_prefix_len = min(len(start), len(end))
+      # Convert the relative precision of fraction (~53 bits) to an absolute
+      # precision needed to represent values between start and end distinctly.
+      prec = common_prefix_len + int(-math.log(fraction, 256)) + 7
+      istart = cls._string_to_int(start, prec)
+      iend = cls._string_to_int(end, prec) if end else 1 << (prec * 8)
+      ikey = istart + int((iend - istart) * fraction)
+      # Could be equal due to rounding.
+      # Adjust to ensure we never return the actual start and end
+      # unless fraction is exatly 0 or 1.
+      if ikey == istart:
+        ikey += 1
+      elif ikey == iend:
+        ikey -= 1
+      return cls._string_from_int(ikey, prec).rstrip('\0')
+
+  @classmethod
+  def position_to_fraction(cls, key, start=None, end=None):
+    """
+    Returns the fraction of keys in the range [start, end) that
+    are less than the given key.
+    """
+    if not key:
+      return 0
+    if start is None:
+      start = ''
+    prec = len(start) + 7
+    if key.startswith(start):
+      # Higher absolute precision needed for very small values of fixed
+      # relative position.
+      prec = max(prec, len(key) - len(key[len(start):].strip('\0')) + 7)
+    istart = cls._string_to_int(start, prec)
+    ikey = cls._string_to_int(key, prec)
+    iend = cls._string_to_int(end, prec) if end else 1 << (prec * 8)
+    return float(ikey - istart) / (iend - istart)
+
+  @staticmethod
+  def _string_to_int(s, prec):
+    """
+    Returns int(256**prec * f) where f is the fraction
+    represented by interpreting '.' + s as a base-256
+    floating point number.
+    """
+    if not s:
+      return 0
+    elif len(s) < prec:
+      s += '\0' * (prec - len(s))
+    else:
+      s = s[:prec]
+    return int(s.encode('hex'), 16)
+
+  @staticmethod
+  def _string_from_int(i, prec):
+    """
+    Inverse of _string_to_int.
+    """
+    h = '%x' % i
+    return ('0' * (2 * prec - len(h)) + h).decode('hex')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54545c43/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py
index 161103c..b80d1f3 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -20,6 +20,7 @@
 import array
 import copy
 import logging
+import math
 import unittest
 
 
@@ -362,7 +363,8 @@ class OrderedPositionRangeTrackerTest(unittest.TestCase):
     self.assertTrue(tracker.try_claim(17))
     # but can't split before claimed 17,
     self.assertIsNone(tracker.try_split(16))
-    # nor claim anything after 18.
+    # nor claim anything at or after 18.
+    self.assertFalse(tracker.try_claim(18))
     self.assertFalse(tracker.try_claim(19))
 
   def test_claim_order(self):
@@ -380,7 +382,7 @@ class OrderedPositionRangeTrackerTest(unittest.TestCase):
     # Can't split before range.
     with self.assertRaises(ValueError):
       tracker.try_split(-5)
-    # Can't split at start position.
+    # Reject useless split at start position.
     with self.assertRaises(ValueError):
       tracker.try_split(10)
     # Can't split after range.
@@ -390,6 +392,10 @@ class OrderedPositionRangeTrackerTest(unittest.TestCase):
     # Can't split after modified range.
     with self.assertRaises(ValueError):
       tracker.try_split(17)
+    # Reject useless split at end position.
+    with self.assertRaises(ValueError):
+      tracker.try_split(15)
+    self.assertTrue(tracker.try_split(14))
 
 
 class UnsplittableRangeTrackerTest(unittest.TestCase):
@@ -416,6 +422,117 @@ class UnsplittableRangeTrackerTest(unittest.TestCase):
     self.assertFalse(copy.copy(tracker).try_split(199))
 
 
+class LexicographicKeyRangeTrackerTest(unittest.TestCase):
+  """
+  Tests of LexicographicKeyRangeTracker.
+  """
+
+  key_to_fraction = (
+      range_trackers.LexicographicKeyRangeTracker.position_to_fraction)
+  fraction_to_key = (
+      range_trackers.LexicographicKeyRangeTracker.fraction_to_position)
+
+  def _check(self, fraction=None, key=None, start=None, end=None, delta=0):
+    assert key is not None or fraction is not None
+    if fraction is None:
+      fraction = self.key_to_fraction(key, start, end)
+    elif key is None:
+      key = self.fraction_to_key(fraction, start, end)
+
+    if key is None and end is None and fraction == 1:
+      # No way to distinguish from fraction == 0.
+      computed_fraction = 1
+    else:
+      computed_fraction = self.key_to_fraction(key, start, end)
+    computed_key = self.fraction_to_key(fraction, start, end)
+
+    if delta:
+      self.assertAlmostEqual(computed_fraction, fraction,
+                             delta=delta, places=None, msg=str(locals()))
+    else:
+      self.assertEqual(computed_fraction, fraction, str(locals()))
+    self.assertEqual(computed_key, key, str(locals()))
+
+  def test_key_to_fraction_no_endpoints(self):
+    self._check(key='\x07', fraction=7/256.)
+    self._check(key='\xFF', fraction=255/256.)
+    self._check(key='\x01\x02\x03', fraction=(2**16 + 2**9 + 3) / (2.0**24))
+
+  def test_key_to_fraction(self):
+    self._check(key='\x87', start='\x80', fraction=7/128.)
+    self._check(key='\x07', end='\x10', fraction=7/16.)
+    self._check(key='\x47', start='\x40', end='\x80', fraction=7/64.)
+    self._check(key='\x47\x80', start='\x40', end='\x80', fraction=15/128.)
+
+  def test_key_to_fraction_common_prefix(self):
+    self._check(
+        key='a' * 100 + 'b', start='a' * 100 + 'a', end='a' * 100 + 'c',
+        fraction=0.5)
+    self._check(
+        key='a' * 100 + 'b', start='a' * 100 + 'a', end='a' * 100 + 'e',
+        fraction=0.25)
+    self._check(
+        key='\xFF' * 100 + '\x40', start='\xFF' * 100, end=None, fraction=0.25)
+    self._check(key='foob',
+                start='fooa\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE',
+                end='foob\x00\x00\x00\x00\x00\x00\x00\x00\x02',
+                fraction=0.5)
+
+  def test_tiny(self):
+    self._check(fraction=.5**20, key='\0\0\x10')
+    self._check(fraction=.5**20, start='a', end='b', key='a\0\0\x10')
+    self._check(fraction=.5**20, start='a', end='c', key='a\0\0\x20')
+    self._check(fraction=.5**20, start='xy_a', end='xy_c', key='xy_a\0\0\x20')
+    self._check(fraction=.5**20, start='\xFF\xFF\x80',
+                key='\xFF\xFF\x80\x00\x08')
+    self._check(fraction=.5**20 / 3,
+                start='xy_a',
+                end='xy_c',
+                key='xy_a\x00\x00\n\xaa\xaa\xaa\xaa\xaa',
+                delta=1e-15)
+    self._check(fraction=.5**100, key='\0' * 12 + '\x10')
+
+  def test_lots(self):
+    for fraction in (0, 1, .5, .75, 7./512, 1 - 7./4096):
+      self._check(fraction)
+      self._check(fraction, start='\x01')
+      self._check(fraction, end='\xF0')
+      self._check(fraction, start='0x75', end='\x76')
+      self._check(fraction, start='0x75', end='\x77')
+      self._check(fraction, start='0x75', end='\x78')
+      self._check(fraction, start='a' * 100 + '\x80', end='a' * 100 + '\x81')
+      self._check(fraction, start='a' * 101 + '\x80', end='a' * 101 + '\x81')
+      self._check(fraction, start='a' * 102 + '\x80', end='a' * 102 + '\x81')
+    for fraction in (.3, 1/3., 1/math.e, .001, 1e-30, .99, .999999):
+      self._check(fraction, delta=1e-14)
+      self._check(fraction, start='\x01', delta=1e-14)
+      self._check(fraction, end='\xF0', delta=1e-14)
+      self._check(fraction, start='0x75', end='\x76', delta=1e-14)
+      self._check(fraction, start='0x75', end='\x77', delta=1e-14)
+      self._check(fraction, start='0x75', end='\x78', delta=1e-14)
+      self._check(fraction, start='a' * 100 + '\x80', end='a' * 100 + '\x81',
+                  delta=1e-14)
+
+  def test_good_prec(self):
+    # There should be about 7 characters (~53 bits) of precision
+    # (beyond the common prefix of start and end).
+    self._check(1 / math.e, start='abc_abc', end='abc_xyz',
+                key='abc_i\xe0\xf4\x84\x86\x99\x96',
+                delta=1e-15)
+    # This remains true even if the start and end keys are given to
+    # high precision.
+    self._check(1 / math.e,
+                start='abcd_abc\0\0\0\0\0_______________abc',
+                end='abcd_xyz\0\0\0\0\0\0_______________abc',
+                key='abcd_i\xe0\xf4\x84\x86\x99\x96',
+                delta=1e-15)
+    # For very small fractions, however, higher precision is used to
+    # accurately represent small increments in the keyspace.
+    self._check(1e-20 / math.e, start='abcd_abc', end='abcd_xyz',
+                key='abcd_abc\x00\x00\x00\x00\x00\x01\x91#\x172N\xbb',
+                delta=1e-35)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()