You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/07 21:37:17 UTC

[beam] branch master updated: Revert "Use TFRecord to store intermediate cache results using PCollection's"

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f2db0  Revert "Use TFRecord to store intermediate cache results using PCollection's"
     new df0543c  Merge pull request #8519 from leo-unc/feature/cache_manager_tfrecord
65f2db0 is described below

commit 65f2db0f0a1c61df5d6667f5aa5eae736aa05f95
Author: Hennadiy Leontyev <le...@gmail.com>
AuthorDate: Tue May 7 19:52:34 2019 +0000

    Revert "Use TFRecord to store intermediate cache results using PCollection's"
    
    This reverts commit 45b0325987371aed81f4860b1c4967c8cecdf468.
---
 .../runners/interactive/cache_manager.py           | 93 +++++++---------------
 .../runners/interactive/cache_manager_test.py      |  8 +-
 2 files changed, 30 insertions(+), 71 deletions(-)

diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 8a90891..e8816fe 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -22,16 +22,22 @@ from __future__ import print_function
 import collections
 import datetime
 import os
-import sys
 import tempfile
-import traceback
+import urllib
 
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import filesystems
-from apache_beam.io import tfrecordio
 from apache_beam.transforms import combiners
 
+try:                    # Python 3
+  unquote_to_bytes = urllib.parse.unquote_to_bytes
+  quote = urllib.parse.quote
+except AttributeError:  # Python 2
+  # pylint: disable=deprecated-urllib-function
+  unquote_to_bytes = urllib.unquote
+  quote = urllib.quote
+
 
 class CacheManager(object):
   """Abstract class for caching PCollections.
@@ -75,21 +81,6 @@ class CacheManager(object):
     """Returns a beam.io.Sink that writes the PCollection cache."""
     raise NotImplementedError
 
-  def save_pcoder(self, pcoder, *labels):
-    """Saves pcoder for given PCollection.
-
-    Correct reading of PCollection from Cache requires PCoder to be known.
-    This method saves desired PCoder for PCollection that will subsequently
-    be used by sink(...), source(...), and, most importantly, read(...) method.
-    The latter must be able to read a PCollection written by Beam using
-    non-Beam IO.
-
-    Args:
-      pcoder: A PCoder to be used for reading and writing a PCollection.
-      labels: List of labels for PCollection instance.
-    """
-    raise NotImplementedError
-
   def cleanup(self):
     """Cleans up all the PCollection caches."""
     raise NotImplementedError
@@ -107,17 +98,6 @@ class FileBasedCacheManager(CacheManager):
       self._cache_dir = tempfile.mkdtemp(
           prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
     self._versions = collections.defaultdict(lambda: self._CacheVersion())
-    # List of saved pcoders keyed by PCollection path. It is OK to keep this
-    # list in memory because once FileBasedCacheManager object is
-    # destroyed/re-created it loses the access to previously written cache
-    # objects anyways even if cache_dir already exists. In other words,
-    # it is not possible to resume execution of Beam pipeline from the
-    # saved cache if FileBasedCacheManager has been reset.
-    #
-    # However, if we are to implement better cache persistence, one needs
-    # to take care of keeping consistency between the cached PCollection
-    # and its PCoder type.
-    self._saved_pcoders = {}
 
   def exists(self, *labels):
     return bool(self._match(*labels))
@@ -129,50 +109,29 @@ class FileBasedCacheManager(CacheManager):
     result = self._versions["-".join(labels)].get_version(timestamp)
     return result
 
-  def save_pcoder(self, pcoder, *labels):
-    self._saved_pcoders[self._path(*labels)] = pcoder
-
-  def load_pcoder(self, *labels):
-    """Returns previously saved PCoder for reading and writing PCollection."""
-    return self._saved_pcoders[self._path(*labels)]
-
   def read(self, *labels):
     if not self.exists(*labels):
       return [], -1
 
-    pcoder = self.load_pcoder(*labels)
-
     def _read_helper():
+      coder = SafeFastPrimitivesCoder()
       for path in self._match(*labels):
-        with filesystems.FileSystems.open(path, 'rb') as file_handle:
-          while True:
-            record = tfrecordio._TFRecordUtil.read_record(file_handle)
-            if record is None:
-              return  # Reached EOF
-            else:
-              try:
-                yield pcoder.decode(record)
-              except:
-                traceback.print_tb(sys.exc_info()[2])
-                raise ValueError(
-                    "Could not decode cache file {} with pcoder {} {}".format(
-                        path, str(pcoder), str(sys.exc_info())))
-
+        for line in filesystems.FileSystems.open(path):
+          yield coder.decode(line.strip())
     result, version = list(_read_helper()), self._latest_version(*labels)
     return result, version
 
   def source(self, *labels):
-    return beam.io.ReadFromTFRecord(self._glob_path(*labels),
-                                    coder=self.load_pcoder(*labels))._source
+    return beam.io.ReadFromText(self._glob_path(*labels),
+                                coder=SafeFastPrimitivesCoder())._source
 
   def sink(self, *labels):
-    return beam.io.WriteToTFRecord(self._path(*labels),
-                                   coder=self.load_pcoder(*labels))._sink
+    return beam.io.WriteToText(self._path(*labels),
+                               coder=SafeFastPrimitivesCoder())._sink
 
   def cleanup(self):
     if filesystems.FileSystems.exists(self._cache_dir):
       filesystems.FileSystems.delete([self._cache_dir])
-    self._saved_pcoders = {}
 
   def _glob_path(self, *labels):
     return self._path(*labels) + '-*-of-*'
@@ -229,14 +188,6 @@ class WriteCache(beam.PTransform):
 
   def expand(self, pcoll):
     prefix = 'sample' if self._sample else 'full'
-
-    # We save pcoder that is necessary for proper reading of
-    # cached PCollection. _cache_manager.sink(...) call below
-    # should be using this saved pcoder.
-    self._cache_manager.save_pcoder(
-        coders.registry.get_coder(pcoll.element_type),
-        prefix, self._label)
-
     if self._sample:
       pcoll |= 'Sample' >> (
           combiners.Sample.FixedSizeGlobally(self._sample_size)
@@ -244,3 +195,15 @@ class WriteCache(beam.PTransform):
     # pylint: disable=expression-not-assigned
     return pcoll | 'Write' >> beam.io.Write(
         self._cache_manager.sink(prefix, self._label))
+
+
+class SafeFastPrimitivesCoder(coders.Coder):
+  """This class add an quote/unquote step to escape special characters."""
+  # pylint: disable=deprecated-urllib-function
+
+  def encode(self, value):
+    return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
+        'utf-8')
+
+  def decode(self, value):
+    return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index 99f18e0..641643f 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -25,9 +25,7 @@ import tempfile
 import time
 import unittest
 
-from apache_beam import coders
 from apache_beam.io import filesystems
-from apache_beam.io import tfrecordio
 from apache_beam.runners.interactive import cache_manager as cache
 
 
@@ -63,12 +61,10 @@ class FileBasedCacheManagerTest(unittest.TestCase):
     time.sleep(0.1)
 
     cache_file = cache_label + '-1-of-2'
-
-    pcoder = coders.coders.FastPrimitivesCoder()
-    self.cache_manager.save_pcoder(pcoder, prefix, cache_label)
     with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
       for line in pcoll_list:
-        tfrecordio._TFRecordUtil.write_record(f, pcoder.encode(line))
+        f.write(cache.SafeFastPrimitivesCoder().encode(line))
+        f.write(b'\n')
 
   def test_exists(self):
     """Test that CacheManager can correctly tell if the cache exists or not."""