You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/07 21:37:17 UTC
[beam] branch master updated: Revert "Use TFRecord to store
intermediate cache results using PCollection's"
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 65f2db0 Revert "Use TFRecord to store intermediate cache results using PCollection's"
new df0543c Merge pull request #8519 from leo-unc/feature/cache_manager_tfrecord
65f2db0 is described below
commit 65f2db0f0a1c61df5d6667f5aa5eae736aa05f95
Author: Hennadiy Leontyev <le...@gmail.com>
AuthorDate: Tue May 7 19:52:34 2019 +0000
Revert "Use TFRecord to store intermediate cache results using PCollection's"
This reverts commit 45b0325987371aed81f4860b1c4967c8cecdf468.
---
.../runners/interactive/cache_manager.py | 93 +++++++---------------
.../runners/interactive/cache_manager_test.py | 8 +-
2 files changed, 30 insertions(+), 71 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index 8a90891..e8816fe 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -22,16 +22,22 @@ from __future__ import print_function
import collections
import datetime
import os
-import sys
import tempfile
-import traceback
+import urllib
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import filesystems
-from apache_beam.io import tfrecordio
from apache_beam.transforms import combiners
+try: # Python 3
+ unquote_to_bytes = urllib.parse.unquote_to_bytes
+ quote = urllib.parse.quote
+except AttributeError: # Python 2
+ # pylint: disable=deprecated-urllib-function
+ unquote_to_bytes = urllib.unquote
+ quote = urllib.quote
+
class CacheManager(object):
"""Abstract class for caching PCollections.
@@ -75,21 +81,6 @@ class CacheManager(object):
"""Returns a beam.io.Sink that writes the PCollection cache."""
raise NotImplementedError
- def save_pcoder(self, pcoder, *labels):
- """Saves pcoder for given PCollection.
-
- Correct reading of PCollection from Cache requires PCoder to be known.
- This method saves desired PCoder for PCollection that will subsequently
- be used by sink(...), source(...), and, most importantly, read(...) method.
- The latter must be able to read a PCollection written by Beam using
- non-Beam IO.
-
- Args:
- pcoder: A PCoder to be used for reading and writing a PCollection.
- labels: List of labels for PCollection instance.
- """
- raise NotImplementedError
-
def cleanup(self):
"""Cleans up all the PCollection caches."""
raise NotImplementedError
@@ -107,17 +98,6 @@ class FileBasedCacheManager(CacheManager):
self._cache_dir = tempfile.mkdtemp(
prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
self._versions = collections.defaultdict(lambda: self._CacheVersion())
- # List of saved pcoders keyed by PCollection path. It is OK to keep this
- # list in memory because once FileBasedCacheManager object is
- # destroyed/re-created it loses the access to previously written cache
- # objects anyways even if cache_dir already exists. In other words,
- # it is not possible to resume execution of Beam pipeline from the
- # saved cache if FileBasedCacheManager has been reset.
- #
- # However, if we are to implement better cache persistence, one needs
- # to take care of keeping consistency between the cached PCollection
- # and its PCoder type.
- self._saved_pcoders = {}
def exists(self, *labels):
return bool(self._match(*labels))
@@ -129,50 +109,29 @@ class FileBasedCacheManager(CacheManager):
result = self._versions["-".join(labels)].get_version(timestamp)
return result
- def save_pcoder(self, pcoder, *labels):
- self._saved_pcoders[self._path(*labels)] = pcoder
-
- def load_pcoder(self, *labels):
- """Returns previously saved PCoder for reading and writing PCollection."""
- return self._saved_pcoders[self._path(*labels)]
-
def read(self, *labels):
if not self.exists(*labels):
return [], -1
- pcoder = self.load_pcoder(*labels)
-
def _read_helper():
+ coder = SafeFastPrimitivesCoder()
for path in self._match(*labels):
- with filesystems.FileSystems.open(path, 'rb') as file_handle:
- while True:
- record = tfrecordio._TFRecordUtil.read_record(file_handle)
- if record is None:
- return # Reached EOF
- else:
- try:
- yield pcoder.decode(record)
- except:
- traceback.print_tb(sys.exc_info()[2])
- raise ValueError(
- "Could not decode cache file {} with pcoder {} {}".format(
- path, str(pcoder), str(sys.exc_info())))
-
+ for line in filesystems.FileSystems.open(path):
+ yield coder.decode(line.strip())
result, version = list(_read_helper()), self._latest_version(*labels)
return result, version
def source(self, *labels):
- return beam.io.ReadFromTFRecord(self._glob_path(*labels),
- coder=self.load_pcoder(*labels))._source
+ return beam.io.ReadFromText(self._glob_path(*labels),
+ coder=SafeFastPrimitivesCoder())._source
def sink(self, *labels):
- return beam.io.WriteToTFRecord(self._path(*labels),
- coder=self.load_pcoder(*labels))._sink
+ return beam.io.WriteToText(self._path(*labels),
+ coder=SafeFastPrimitivesCoder())._sink
def cleanup(self):
if filesystems.FileSystems.exists(self._cache_dir):
filesystems.FileSystems.delete([self._cache_dir])
- self._saved_pcoders = {}
def _glob_path(self, *labels):
return self._path(*labels) + '-*-of-*'
@@ -229,14 +188,6 @@ class WriteCache(beam.PTransform):
def expand(self, pcoll):
prefix = 'sample' if self._sample else 'full'
-
- # We save pcoder that is necessary for proper reading of
- # cached PCollection. _cache_manager.sink(...) call below
- # should be using this saved pcoder.
- self._cache_manager.save_pcoder(
- coders.registry.get_coder(pcoll.element_type),
- prefix, self._label)
-
if self._sample:
pcoll |= 'Sample' >> (
combiners.Sample.FixedSizeGlobally(self._sample_size)
@@ -244,3 +195,15 @@ class WriteCache(beam.PTransform):
# pylint: disable=expression-not-assigned
return pcoll | 'Write' >> beam.io.Write(
self._cache_manager.sink(prefix, self._label))
+
+
+class SafeFastPrimitivesCoder(coders.Coder):
+ """This class add an quote/unquote step to escape special characters."""
+ # pylint: disable=deprecated-urllib-function
+
+ def encode(self, value):
+ return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
+ 'utf-8')
+
+ def decode(self, value):
+ return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index 99f18e0..641643f 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -25,9 +25,7 @@ import tempfile
import time
import unittest
-from apache_beam import coders
from apache_beam.io import filesystems
-from apache_beam.io import tfrecordio
from apache_beam.runners.interactive import cache_manager as cache
@@ -63,12 +61,10 @@ class FileBasedCacheManagerTest(unittest.TestCase):
time.sleep(0.1)
cache_file = cache_label + '-1-of-2'
-
- pcoder = coders.coders.FastPrimitivesCoder()
- self.cache_manager.save_pcoder(pcoder, prefix, cache_label)
with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
for line in pcoll_list:
- tfrecordio._TFRecordUtil.write_record(f, pcoder.encode(line))
+ f.write(cache.SafeFastPrimitivesCoder().encode(line))
+ f.write(b'\n')
def test_exists(self):
"""Test that CacheManager can correctly tell if the cache exists or not."""