You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/02 16:10:11 UTC

[beam] branch master updated: Use TFRecord to store intermediate cache results using PCollection's PCoder.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 45b0325  Use TFRecord to store intermediate cache results using PCollection's PCoder.
     new ce77db1  Merge pull request #8458 from leo-unc/feature/cache_manager_tfrecord
45b0325 is described below

commit 45b0325987371aed81f4860b1c4967c8cecdf468
Author: Hennadiy Leontyev <le...@gmail.com>
AuthorDate: Wed May 1 19:27:05 2019 +0000

    Use TFRecord to store intermediate cache results using PCollection's
    PCoder.
---
 .../runners/interactive/cache_manager.py           | 93 +++++++++++++++-------
 .../runners/interactive/cache_manager_test.py      |  8 +-
 2 files changed, 71 insertions(+), 30 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index e8816fe..8a90891 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -22,22 +22,16 @@ from __future__ import print_function
 import collections
 import datetime
 import os
+import sys
 import tempfile
-import urllib
+import traceback
 
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import filesystems
+from apache_beam.io import tfrecordio
 from apache_beam.transforms import combiners
 
-try:                    # Python 3
-  unquote_to_bytes = urllib.parse.unquote_to_bytes
-  quote = urllib.parse.quote
-except AttributeError:  # Python 2
-  # pylint: disable=deprecated-urllib-function
-  unquote_to_bytes = urllib.unquote
-  quote = urllib.quote
-
 
 class CacheManager(object):
   """Abstract class for caching PCollections.
@@ -81,6 +75,21 @@ class CacheManager(object):
     """Returns a beam.io.Sink that writes the PCollection cache."""
     raise NotImplementedError
 
+  def save_pcoder(self, pcoder, *labels):
+    """Saves pcoder for given PCollection.
+
+    Correct reading of PCollection from Cache requires PCoder to be known.
+    This method saves desired PCoder for PCollection that will subsequently
+    be used by sink(...), source(...), and, most importantly, read(...) method.
+    The latter must be able to read a PCollection written by Beam using
+    non-Beam IO.
+
+    Args:
+      pcoder: A PCoder to be used for reading and writing a PCollection.
+      labels: List of labels for PCollection instance.
+    """
+    raise NotImplementedError
+
   def cleanup(self):
     """Cleans up all the PCollection caches."""
     raise NotImplementedError
@@ -98,6 +107,17 @@ class FileBasedCacheManager(CacheManager):
       self._cache_dir = tempfile.mkdtemp(
           prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
     self._versions = collections.defaultdict(lambda: self._CacheVersion())
+    # List of saved pcoders keyed by PCollection path. It is OK to keep this
+    # list in memory because once FileBasedCacheManager object is
+    # destroyed/re-created it loses the access to previously written cache
+    # objects anyways even if cache_dir already exists. In other words,
+    # it is not possible to resume execution of Beam pipeline from the
+    # saved cache if FileBasedCacheManager has been reset.
+    #
+    # However, if we are to implement better cache persistence, one needs
+    # to take care of keeping consistency between the cached PCollection
+    # and its PCoder type.
+    self._saved_pcoders = {}
 
   def exists(self, *labels):
     return bool(self._match(*labels))
@@ -109,29 +129,50 @@ class FileBasedCacheManager(CacheManager):
     result = self._versions["-".join(labels)].get_version(timestamp)
     return result
 
+  def save_pcoder(self, pcoder, *labels):
+    self._saved_pcoders[self._path(*labels)] = pcoder
+
+  def load_pcoder(self, *labels):
+    """Returns previously saved PCoder for reading and writing PCollection."""
+    return self._saved_pcoders[self._path(*labels)]
+
   def read(self, *labels):
     if not self.exists(*labels):
       return [], -1
 
+    pcoder = self.load_pcoder(*labels)
+
     def _read_helper():
-      coder = SafeFastPrimitivesCoder()
       for path in self._match(*labels):
-        for line in filesystems.FileSystems.open(path):
-          yield coder.decode(line.strip())
+        with filesystems.FileSystems.open(path, 'rb') as file_handle:
+          while True:
+            record = tfrecordio._TFRecordUtil.read_record(file_handle)
+            if record is None:
+              return  # Reached EOF
+            else:
+              try:
+                yield pcoder.decode(record)
+              except:
+                traceback.print_tb(sys.exc_info()[2])
+                raise ValueError(
+                    "Could not decode cache file {} with pcoder {} {}".format(
+                        path, str(pcoder), str(sys.exc_info())))
+
     result, version = list(_read_helper()), self._latest_version(*labels)
     return result, version
 
   def source(self, *labels):
-    return beam.io.ReadFromText(self._glob_path(*labels),
-                                coder=SafeFastPrimitivesCoder())._source
+    return beam.io.ReadFromTFRecord(self._glob_path(*labels),
+                                    coder=self.load_pcoder(*labels))._source
 
   def sink(self, *labels):
-    return beam.io.WriteToText(self._path(*labels),
-                               coder=SafeFastPrimitivesCoder())._sink
+    return beam.io.WriteToTFRecord(self._path(*labels),
+                                   coder=self.load_pcoder(*labels))._sink
 
   def cleanup(self):
     if filesystems.FileSystems.exists(self._cache_dir):
       filesystems.FileSystems.delete([self._cache_dir])
+    self._saved_pcoders = {}
 
   def _glob_path(self, *labels):
     return self._path(*labels) + '-*-of-*'
@@ -188,6 +229,14 @@ class WriteCache(beam.PTransform):
 
   def expand(self, pcoll):
     prefix = 'sample' if self._sample else 'full'
+
+    # We save pcoder that is necessary for proper reading of
+    # cached PCollection. _cache_manager.sink(...) call below
+    # should be using this saved pcoder.
+    self._cache_manager.save_pcoder(
+        coders.registry.get_coder(pcoll.element_type),
+        prefix, self._label)
+
     if self._sample:
       pcoll |= 'Sample' >> (
           combiners.Sample.FixedSizeGlobally(self._sample_size)
@@ -195,15 +244,3 @@ class WriteCache(beam.PTransform):
     # pylint: disable=expression-not-assigned
     return pcoll | 'Write' >> beam.io.Write(
         self._cache_manager.sink(prefix, self._label))
-
-
-class SafeFastPrimitivesCoder(coders.Coder):
-  """This class add an quote/unquote step to escape special characters."""
-  # pylint: disable=deprecated-urllib-function
-
-  def encode(self, value):
-    return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
-        'utf-8')
-
-  def decode(self, value):
-    return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index 641643f..99f18e0 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -25,7 +25,9 @@ import tempfile
 import time
 import unittest
 
+from apache_beam import coders
 from apache_beam.io import filesystems
+from apache_beam.io import tfrecordio
 from apache_beam.runners.interactive import cache_manager as cache
 
 
@@ -61,10 +63,12 @@ class FileBasedCacheManagerTest(unittest.TestCase):
     time.sleep(0.1)
 
     cache_file = cache_label + '-1-of-2'
+
+    pcoder = coders.coders.FastPrimitivesCoder()
+    self.cache_manager.save_pcoder(pcoder, prefix, cache_label)
     with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
       for line in pcoll_list:
-        f.write(cache.SafeFastPrimitivesCoder().encode(line))
-        f.write(b'\n')
+        tfrecordio._TFRecordUtil.write_record(f, pcoder.encode(line))
 
   def test_exists(self):
     """Test that CacheManager can correctly tell if the cache exists or not."""