You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/02 16:10:11 UTC
[beam] branch master updated: Use TFRecord to store intermediate
cache results using PCollection's PCoder.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 45b0325 Use TFRecord to store intermediate cache results using PCollection's PCoder.
new ce77db1 Merge pull request #8458 from leo-unc/feature/cache_manager_tfrecord
45b0325 is described below
commit 45b0325987371aed81f4860b1c4967c8cecdf468
Author: Hennadiy Leontyev <le...@gmail.com>
AuthorDate: Wed May 1 19:27:05 2019 +0000
Use TFRecord to store intermediate cache results using PCollection's
PCoder.
---
.../runners/interactive/cache_manager.py | 93 +++++++++++++++-------
.../runners/interactive/cache_manager_test.py | 8 +-
2 files changed, 71 insertions(+), 30 deletions(-)
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py
index e8816fe..8a90891 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py
@@ -22,22 +22,16 @@ from __future__ import print_function
import collections
import datetime
import os
+import sys
import tempfile
-import urllib
+import traceback
import apache_beam as beam
from apache_beam import coders
from apache_beam.io import filesystems
+from apache_beam.io import tfrecordio
from apache_beam.transforms import combiners
-try: # Python 3
- unquote_to_bytes = urllib.parse.unquote_to_bytes
- quote = urllib.parse.quote
-except AttributeError: # Python 2
- # pylint: disable=deprecated-urllib-function
- unquote_to_bytes = urllib.unquote
- quote = urllib.quote
-
class CacheManager(object):
"""Abstract class for caching PCollections.
@@ -81,6 +75,21 @@ class CacheManager(object):
"""Returns a beam.io.Sink that writes the PCollection cache."""
raise NotImplementedError
+ def save_pcoder(self, pcoder, *labels):
+ """Saves pcoder for given PCollection.
+
+ Correct reading of PCollection from Cache requires PCoder to be known.
+ This method saves desired PCoder for PCollection that will subsequently
+ be used by sink(...), source(...), and, most importantly, read(...) method.
+ The latter must be able to read a PCollection written by Beam using
+ non-Beam IO.
+
+ Args:
+ pcoder: A PCoder to be used for reading and writing a PCollection.
+ labels: List of labels for PCollection instance.
+ """
+ raise NotImplementedError
+
def cleanup(self):
"""Cleans up all the PCollection caches."""
raise NotImplementedError
@@ -98,6 +107,17 @@ class FileBasedCacheManager(CacheManager):
self._cache_dir = tempfile.mkdtemp(
prefix='interactive-temp-', dir=os.environ.get('TEST_TMPDIR', None))
self._versions = collections.defaultdict(lambda: self._CacheVersion())
+ # List of saved pcoders keyed by PCollection path. It is OK to keep this
+ # list in memory because once FileBasedCacheManager object is
+ # destroyed/re-created it loses the access to previously written cache
+ # objects anyways even if cache_dir already exists. In other words,
+ # it is not possible to resume execution of Beam pipeline from the
+ # saved cache if FileBasedCacheManager has been reset.
+ #
+ # However, if we are to implement better cache persistence, one needs
+ # to take care of keeping consistency between the cached PCollection
+ # and its PCoder type.
+ self._saved_pcoders = {}
def exists(self, *labels):
return bool(self._match(*labels))
@@ -109,29 +129,50 @@ class FileBasedCacheManager(CacheManager):
result = self._versions["-".join(labels)].get_version(timestamp)
return result
+ def save_pcoder(self, pcoder, *labels):
+ self._saved_pcoders[self._path(*labels)] = pcoder
+
+ def load_pcoder(self, *labels):
+ """Returns previously saved PCoder for reading and writing PCollection."""
+ return self._saved_pcoders[self._path(*labels)]
+
def read(self, *labels):
if not self.exists(*labels):
return [], -1
+ pcoder = self.load_pcoder(*labels)
+
def _read_helper():
- coder = SafeFastPrimitivesCoder()
for path in self._match(*labels):
- for line in filesystems.FileSystems.open(path):
- yield coder.decode(line.strip())
+ with filesystems.FileSystems.open(path, 'rb') as file_handle:
+ while True:
+ record = tfrecordio._TFRecordUtil.read_record(file_handle)
+ if record is None:
+ return # Reached EOF
+ else:
+ try:
+ yield pcoder.decode(record)
+ except:
+ traceback.print_tb(sys.exc_info()[2])
+ raise ValueError(
+ "Could not decode cache file {} with pcoder {} {}".format(
+ path, str(pcoder), str(sys.exc_info())))
+
result, version = list(_read_helper()), self._latest_version(*labels)
return result, version
def source(self, *labels):
- return beam.io.ReadFromText(self._glob_path(*labels),
- coder=SafeFastPrimitivesCoder())._source
+ return beam.io.ReadFromTFRecord(self._glob_path(*labels),
+ coder=self.load_pcoder(*labels))._source
def sink(self, *labels):
- return beam.io.WriteToText(self._path(*labels),
- coder=SafeFastPrimitivesCoder())._sink
+ return beam.io.WriteToTFRecord(self._path(*labels),
+ coder=self.load_pcoder(*labels))._sink
def cleanup(self):
if filesystems.FileSystems.exists(self._cache_dir):
filesystems.FileSystems.delete([self._cache_dir])
+ self._saved_pcoders = {}
def _glob_path(self, *labels):
return self._path(*labels) + '-*-of-*'
@@ -188,6 +229,14 @@ class WriteCache(beam.PTransform):
def expand(self, pcoll):
prefix = 'sample' if self._sample else 'full'
+
+ # We save pcoder that is necessary for proper reading of
+ # cached PCollection. _cache_manager.sink(...) call below
+ # should be using this saved pcoder.
+ self._cache_manager.save_pcoder(
+ coders.registry.get_coder(pcoll.element_type),
+ prefix, self._label)
+
if self._sample:
pcoll |= 'Sample' >> (
combiners.Sample.FixedSizeGlobally(self._sample_size)
@@ -195,15 +244,3 @@ class WriteCache(beam.PTransform):
# pylint: disable=expression-not-assigned
return pcoll | 'Write' >> beam.io.Write(
self._cache_manager.sink(prefix, self._label))
-
-
-class SafeFastPrimitivesCoder(coders.Coder):
- """This class add an quote/unquote step to escape special characters."""
- # pylint: disable=deprecated-urllib-function
-
- def encode(self, value):
- return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode(
- 'utf-8')
-
- def decode(self, value):
- return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value))
diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
index 641643f..99f18e0 100644
--- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py
@@ -25,7 +25,9 @@ import tempfile
import time
import unittest
+from apache_beam import coders
from apache_beam.io import filesystems
+from apache_beam.io import tfrecordio
from apache_beam.runners.interactive import cache_manager as cache
@@ -61,10 +63,12 @@ class FileBasedCacheManagerTest(unittest.TestCase):
time.sleep(0.1)
cache_file = cache_label + '-1-of-2'
+
+ pcoder = coders.coders.FastPrimitivesCoder()
+ self.cache_manager.save_pcoder(pcoder, prefix, cache_label)
with open(self.cache_manager._path(prefix, cache_file), 'wb') as f:
for line in pcoll_list:
- f.write(cache.SafeFastPrimitivesCoder().encode(line))
- f.write(b'\n')
+ tfrecordio._TFRecordUtil.write_record(f, pcoder.encode(line))
def test_exists(self):
"""Test that CacheManager can correctly tell if the cache exists or not."""