You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 23:03:24 UTC

[17/50] [abbrv] beam git commit: Removes Dataflow native text source and sink from Beam SDK.

Removes Dataflow native text source and sink from Beam SDK.

Users should be using Beam text source and sink available in module 'textio.py' instead of this.

Also removes Dataflow native file source/sink that is only used by native text source/sink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52fc95dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52fc95dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52fc95dd

Branch: refs/heads/master
Commit: 52fc95ddebceaaf27897c4f6d5b97d08bd4b3a1e
Parents: f983123
Author: Chamikara Jayalath <ch...@google.com>
Authored: Mon Jan 23 13:23:45 2017 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Jan 24 16:31:02 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py            | 542 +-------------
 sdks/python/apache_beam/io/fileio_test.py       | 729 +------------------
 .../runners/direct/transform_evaluator.py       |   5 -
 3 files changed, 3 insertions(+), 1273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52fc95dd/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index ebc4fed..52f31c6 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -34,12 +34,10 @@ import weakref
 from apache_beam import coders
 from apache_beam.io import gcsio
 from apache_beam.io import iobase
-from apache_beam.io import range_trackers
-from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms.display import DisplayDataItem
 
 
-__all__ = ['TextFileSource', 'TextFileSink']
+__all__ = ['TextFileSink']
 
 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
 
@@ -111,326 +109,6 @@ class CompressionTypes(object):
     return cls.UNCOMPRESSED
 
 
-class NativeFileSource(dataflow_io.NativeSource):
-  """A source implemented by Dataflow service from a GCS or local file or files.
-
-  This class is to be only inherited by sources natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def __init__(self,
-               file_path,
-               start_offset=None,
-               end_offset=None,
-               coder=coders.BytesCoder(),
-               compression_type=CompressionTypes.AUTO,
-               mime_type='application/octet-stream'):
-    """Initialize a NativeFileSource.
-
-    Args:
-      file_path: The file path to read from as a local file path or a GCS
-        gs:// path. The path can contain glob characters (*, ?, and [...]
-        sets).
-      start_offset: The byte offset in the source file that the reader
-        should start reading. By default is 0 (beginning of file).
-      end_offset: The byte offset in the file that the reader should stop
-        reading. By default it is the end of the file.
-      compression_type: Used to handle compressed input files. Typical value
-          is CompressionTypes.AUTO, in which case the file_path's extension will
-          be used to detect the compression.
-      coder: Coder used to decode each record.
-
-    Raises:
-      TypeError: if file_path is not a string.
-
-    If the file_path contains glob characters then the start_offset and
-    end_offset must not be specified.
-
-    The 'start_offset' and 'end_offset' pair provide a mechanism to divide the
-    file into multiple pieces for individual sources. Because the offset
-    is measured by bytes, some complication arises when the offset splits in
-    the middle of a record. To avoid the scenario where two adjacent sources
-    each get a fraction of a line we adopt the following rules:
-
-    If start_offset falls inside a record (any character except the first one)
-    then the source will skip the record and start with the next one.
-
-    If end_offset falls inside a record (any character except the first one)
-    then the source will contain that entire record.
-    """
-    if not isinstance(file_path, basestring):
-      raise TypeError('%s: file_path must be a string;  got %r instead' %
-                      (self.__class__.__name__, file_path))
-
-    self.file_path = file_path
-    self.start_offset = start_offset
-    self.end_offset = end_offset
-    self.compression_type = compression_type
-    self.coder = coder
-    self.mime_type = mime_type
-
-  def display_data(self):
-    return {'file_pattern': DisplayDataItem(self.file_path,
-                                            label="File Pattern"),
-            'compression': DisplayDataItem(str(self.compression_type),
-                                           label='Compression')}
-
-  def __eq__(self, other):
-    return (self.file_path == other.file_path and
-            self.start_offset == other.start_offset and
-            self.end_offset == other.end_offset and
-            self.compression_type == other.compression_type and
-            self.coder == other.coder and self.mime_type == other.mime_type)
-
-  @property
-  def path(self):
-    return self.file_path
-
-  def reader(self):
-    return NativeFileSourceReader(self)
-
-
-class NativeFileSourceReader(dataflow_io.NativeSourceReader,
-                             coders.observable.ObservableMixin):
-  """The source reader for a NativeFileSource.
-
-  This class is to be only inherited by source readers natively implemented by
-  Cloud Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def __init__(self, source):
-    super(NativeFileSourceReader, self).__init__()
-    self.source = source
-    self.start_offset = self.source.start_offset or 0
-    self.end_offset = self.source.end_offset
-    self.current_offset = self.start_offset
-
-  def __enter__(self):
-    self.file = ChannelFactory.open(
-        self.source.file_path,
-        'rb',
-        mime_type=self.source.mime_type,
-        compression_type=self.source.compression_type)
-
-    # Determine the real end_offset.
-    #
-    # If not specified or if the source is not splittable it will be the length
-    # of the file (or infinity for compressed files) as appropriate.
-    if ChannelFactory.is_compressed(self.file):
-      if not isinstance(self.source, TextFileSource):
-        raise ValueError('Unexpected compressed file for a non-TextFileSource.')
-      self.end_offset = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
-
-    elif self.end_offset is None:
-      self.file.seek(0, os.SEEK_END)
-      self.end_offset = self.file.tell()
-      self.file.seek(self.start_offset)
-
-    # Initializing range tracker after self.end_offset is finalized.
-    self.range_tracker = range_trackers.OffsetRangeTracker(self.start_offset,
-                                                           self.end_offset)
-
-    # Position to the appropriate start_offset.
-    if self.start_offset > 0 and ChannelFactory.is_compressed(self.file):
-      raise ValueError(
-          'Unexpected positive start_offset (%s) for a compressed source: %s',
-          self.start_offset, self.source)
-    self.seek_to_true_start_offset()
-
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.file.close()
-
-  def __iter__(self):
-    if self.current_offset > 0 and ChannelFactory.is_compressed(self.file):
-      # When compression is enabled both initial and dynamic splitting should
-      # not be allowed.
-      raise ValueError(
-          'Unespected split starting at (%s) for compressed source: %s',
-          self.current_offset, self.source)
-
-    while True:
-      if not self.range_tracker.try_claim(record_start=self.current_offset):
-        # Reader has completed reading the set of records in its range. Note
-        # that the end offset of the range may be smaller than the original
-        # end offset defined when creating the reader due to reader accepting
-        # a dynamic split request from the service.
-        return
-
-      # Note that for compressed sources, delta_offsets are virtual and don't
-      # actually correspond to byte offsets in the underlying file. They
-      # nonetheless correspond to unique virtual position locations.
-      for eof, record, delta_offset in self.read_records():
-        if eof:
-          # Can't read from this source anymore and the record and delta_offset
-          # are non-sensical; hence we are done.
-          return
-        else:
-          self.notify_observers(record, is_encoded=False)
-          self.current_offset += delta_offset
-          yield record
-
-  def seek_to_true_start_offset(self):
-    """Seeks the underlying file to the appropriate start_offset that is
-       compatible with range-tracking and position models and updates
-       self.current_offset accordingly.
-    """
-    raise NotImplementedError
-
-  def read_records(self):
-    """
-      Yields information about (possibly multiple) records corresponding to
-      self.current_offset
-
-      If a read_records() invocation returns multiple results, the first record
-      must be at a split point and other records should not be at split points.
-      The first record is assumed to be at self.current_offset and the caller
-      should use the yielded delta_offsets to update self.current_offset
-      accordingly.
-
-      The yielded value is a tripplet of the form:
-        eof, record, delta_offset
-      eof: A boolean indicating whether eof has been reached, in which case
-        the contents of record and delta_offset cannot be trusted or used.
-      record: The (possibly decoded) record (ie payload) read from the
-        underlying source.
-      delta_offset: The delta_offfset (from self.current_offset) in bytes, that
-        has been consumed from the underlying source, to the starting position
-        of the next record (or EOF if no record exists).
-    """
-    raise NotImplementedError
-
-  def get_progress(self):
-    return dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
-        byte_offset=self.range_tracker.last_record_start))
-
-  def request_dynamic_split(self, dynamic_split_request):
-    if ChannelFactory.is_compressed(self.file):
-      # When compression is enabled both initial and dynamic splitting should be
-      # prevented. Here we prevent dynamic splitting by ignoring all dynamic
-      # split requests at the reader.
-      return
-
-    assert dynamic_split_request is not None
-    progress = dynamic_split_request.progress
-    split_position = progress.position
-    if split_position is None:
-      percent_complete = progress.percent_complete
-      if percent_complete is not None:
-        if percent_complete <= 0 or percent_complete >= 1:
-          logging.warning(
-              'FileBasedReader cannot be split since the provided percentage '
-              'of work to be completed is out of the valid range (0, '
-              '1). Requested: %r', dynamic_split_request)
-          return
-        split_position = dataflow_io.ReaderPosition()
-        split_position.byte_offset = (
-            self.range_tracker.position_at_fraction(percent_complete))
-      else:
-        logging.warning(
-            'FileBasedReader requires either a position or a percentage of '
-            'work to be complete to perform a dynamic split request. '
-            'Requested: %r', dynamic_split_request)
-        return
-
-    if self.range_tracker.try_split(split_position.byte_offset):
-      return dataflow_io.DynamicSplitResultWithPosition(split_position)
-    else:
-      return
-
-# -----------------------------------------------------------------------------
-# TextFileSource, TextFileSink.
-
-
-class TextFileSource(NativeFileSource):
-  """A source for a GCS or local text file.
-
-  Parses a text file as newline-delimited elements, by default assuming
-  UTF-8 encoding.
-
-  This implementation has only been tested to read text encoded using UTF-8 or
-  ASCII. This has not been tested for other encodings such as UTF-16 or UTF-32.
-  """
-
-  def __init__(self,
-               file_path,
-               start_offset=None,
-               end_offset=None,
-               compression_type=CompressionTypes.AUTO,
-               strip_trailing_newlines=True,
-               coder=coders.StrUtf8Coder(),
-               mime_type='text/plain'):
-    """Initialize a TextSource.
-
-    Args:
-      file_path: The file path to read from as a local file path or a GCS
-        gs:// path. The path can contain glob characters (*, ?, and [...]
-        sets).
-      start_offset: The byte offset in the source text file that the reader
-        should start reading. By default is 0 (beginning of file).
-      end_offset: The byte offset in the file that the reader should stop
-        reading. By default it is the end of the file.
-      compression_type: Used to handle compressed input files. Typical value
-          is CompressionTypes.AUTO, in which case the file_path's extension will
-          be used to detect the compression.
-      strip_trailing_newlines: Indicates whether this source should remove
-          the newline char in each line it reads before decoding that line.
-          This feature only works for ASCII and UTF-8 encoded input.
-      coder: Coder used to decode each line.
-
-    Raises:
-      TypeError: if file_path is not a string.
-
-    If the file_path contains glob characters then the start_offset and
-    end_offset must not be specified.
-
-    The 'start_offset' and 'end_offset' pair provide a mechanism to divide the
-    text file into multiple pieces for individual sources. Because the offset
-    is measured by bytes, some complication arises when the offset splits in
-    the middle of a text line. To avoid the scenario where two adjacent sources
-    each get a fraction of a line we adopt the following rules:
-
-    If start_offset falls inside a line (any character except the first one)
-    then the source will skip the line and start with the next one.
-
-    If end_offset falls inside a line (any character except the first one) then
-    the source will contain that entire line.
-    """
-    super(TextFileSource, self).__init__(
-        file_path,
-        start_offset=start_offset,
-        end_offset=end_offset,
-        coder=coder,
-        compression_type=compression_type)
-    self.strip_trailing_newlines = strip_trailing_newlines
-
-  @property
-  def format(self):
-    """Source format name required for remote execution."""
-    return 'text'
-
-  def __eq__(self, other):
-    return (super(TextFileSource, self).__eq__(other) and
-            self.strip_trailing_newlines == other.strip_trailing_newlines)
-
-  def reader(self):
-    # If a multi-file pattern was specified as a source then make sure the
-    # start/end offsets use the default values for reading the entire file.
-    if re.search(r'[*?\[\]]', self.file_path) is not None:
-      if self.start_offset is not None:
-        raise ValueError(
-            'start offset cannot be specified for a multi-file source: '
-            '%s' % self.file_path)
-      if self.end_offset is not None:
-        raise ValueError(
-            'End offset cannot be specified for a multi-file source: '
-            '%s' % self.file_path)
-      return TextMultiFileReader(self)
-    else:
-      return TextFileReader(self)
-
-
 class ChannelFactory(object):
   # TODO: Generalize into extensible framework.
 
@@ -1114,221 +792,3 @@ class TextFileSink(FileSink):
     file_handle.write(encoded_value)
     if self.append_trailing_newlines:
       file_handle.write('\n')
-
-
-class NativeFileSink(dataflow_io.NativeSink):
-  """A sink implemented by Dataflow service to a GCS or local file or files.
-
-  This class is to be only inherited by sinks natively implemented by Cloud
-  Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def __init__(self,
-               file_path_prefix,
-               file_name_suffix='',
-               num_shards=0,
-               shard_name_template=None,
-               validate=True,
-               coder=coders.BytesCoder(),
-               mime_type='application/octet-stream',
-               compression_type=CompressionTypes.AUTO):
-    if not CompressionTypes.is_valid_compression_type(compression_type):
-      raise TypeError('compression_type must be CompressionType object but '
-                      'was %s' % type(compression_type))
-
-    # We initialize a file_path attribute containing just the prefix part for
-    # local runner environment. For now, sharding is not supported in the local
-    # runner and sharding options (template, num, suffix) are ignored.
-    # The attribute is also used in the worker environment when we just write
-    # to a specific file.
-    # TODO: Add support for file sharding in the local runner.
-    self.file_path = file_path_prefix
-    self.coder = coder
-    self.file_name_prefix = file_path_prefix
-    self.file_name_suffix = file_name_suffix
-    self.num_shards = num_shards
-    # TODO: Update this when the service supports more patterns.
-    self.shard_name_template = (DEFAULT_SHARD_NAME_TEMPLATE if
-                                shard_name_template is None else
-                                shard_name_template)
-    # TODO: Implement sink validation.
-    self.validate = validate
-    self.mime_type = mime_type
-    self.compression_type = compression_type
-
-  def display_data(self):
-    file_name_pattern = '{}{}{}'.format(self.file_name_prefix,
-                                        self.shard_name_template,
-                                        self.file_name_suffix)
-    return {'shards':
-            DisplayDataItem(self.num_shards,
-                            label='Number of Shards'),
-            'file_pattern':
-            DisplayDataItem(file_name_pattern,
-                            label='File Name Pattern'),
-            'compression':
-            DisplayDataItem(str(self.compression_type),
-                            label='Compression Type')}
-
-  @property
-  def path(self):
-    return self.file_path
-
-  def writer(self):
-    return NativeFileSinkWriter(self)
-
-  def __eq__(self, other):
-    return (self.file_path == other.file_path and self.coder == other.coder and
-            self.file_name_prefix == other.file_name_prefix and
-            self.file_name_suffix == other.file_name_suffix and
-            self.num_shards == other.num_shards and
-            self.shard_name_template == other.shard_name_template and
-            self.validate == other.validate and
-            self.mime_type == other.mime_type and
-            self.compression_type == other.compression_type)
-
-
-class NativeFileSinkWriter(dataflow_io.NativeSinkWriter):
-  """The sink writer for a NativeFileSink.
-
-  This class is to be only inherited by sink writers natively implemented by
-  Cloud Dataflow service, hence should not be sub-classed by users.
-  """
-
-  def __init__(self, sink):
-    self.sink = sink
-
-  def __enter__(self):
-    self.file = ChannelFactory.open(
-        self.sink.file_path,
-        'wb',
-        mime_type=self.sink.mime_type,
-        compression_type=self.sink.compression_type)
-
-    if (ChannelFactory.is_compressed(self.file) and
-        not isinstance(self.sink, NativeTextFileSink)):
-      raise ValueError(
-          'Unexpected compressed file for a non-NativeTextFileSink.')
-
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    self.file.close()
-
-  def Write(self, value):
-    self.file.write(self.sink.coder.encode(value))
-
-
-class NativeTextFileSink(NativeFileSink):
-  """A sink to a GCS or local text file or files."""
-
-  def __init__(self,
-               file_path_prefix,
-               append_trailing_newlines=True,
-               file_name_suffix='',
-               num_shards=0,
-               shard_name_template=None,
-               validate=True,
-               coder=coders.ToStringCoder(),
-               mime_type='text/plain',
-               compression_type=CompressionTypes.AUTO):
-    super(NativeTextFileSink, self).__init__(
-        file_path_prefix,
-        file_name_suffix=file_name_suffix,
-        num_shards=num_shards,
-        shard_name_template=shard_name_template,
-        validate=validate,
-        coder=coder,
-        mime_type=mime_type,
-        compression_type=compression_type)
-    self.append_trailing_newlines = append_trailing_newlines
-
-  @property
-  def format(self):
-    """Sink format name required for remote execution."""
-    return 'text'
-
-  def writer(self):
-    return TextFileWriter(self)
-
-  def __eq__(self, other):
-    return (super(NativeTextFileSink, self).__eq__(other) and
-            self.append_trailing_newlines == other.append_trailing_newlines)
-
-# -----------------------------------------------------------------------------
-# TextFileReader, TextMultiFileReader.
-
-
-class TextFileReader(NativeFileSourceReader):
-  """A reader for a text file source."""
-
-  def seek_to_true_start_offset(self):
-    if ChannelFactory.is_compressed(self.file):
-      # When compression is enabled both initial and dynamic splitting should be
-      # prevented. Here we don't perform any seeking to a different offset, nor
-      # do we update the current_offset so that the rest of the framework can
-      # properly deal with compressed files.
-      return
-
-    if self.start_offset > 0:
-      # Read one byte before. This operation will either consume a previous
-      # newline if start_offset was at the beginning of a line or consume the
-      # line if we were in the middle of it. Either way we get the read
-      # position exactly where we wanted: at the beginning of the first full
-      # line.
-      self.file.seek(self.start_offset - 1)
-      self.current_offset -= 1
-      line = self.file.readline()
-      self.notify_observers(line, is_encoded=True)
-      self.current_offset += len(line)
-
-  def read_records(self):
-    line = self.file.readline()
-    delta_offset = len(line)
-
-    if delta_offset == 0:
-      yield True, None, None  # Reached EOF.
-    else:
-      if self.source.strip_trailing_newlines:
-        line = line.rstrip('\n')
-      yield False, self.source.coder.decode(line), delta_offset
-
-
-class TextMultiFileReader(dataflow_io.NativeSourceReader):
-  """A reader for a multi-file text source."""
-
-  def __init__(self, source):
-    self.source = source
-    self.file_paths = ChannelFactory.glob(self.source.file_path)
-    if not self.file_paths:
-      raise RuntimeError('No files found for path: %s' % self.source.file_path)
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, exception_type, exception_value, traceback):
-    pass
-
-  def __iter__(self):
-    index = 0
-    for path in self.file_paths:
-      index += 1
-      logging.info('Reading from %s (%d/%d)', path, index, len(self.file_paths))
-      with TextFileSource(
-          path,
-          strip_trailing_newlines=self.source.strip_trailing_newlines,
-          coder=self.source.coder).reader() as reader:
-        for line in reader:
-          yield line
-
-# -----------------------------------------------------------------------------
-# TextFileWriter.
-
-
-class TextFileWriter(NativeFileSinkWriter):
-  """The sink writer for a TextFileSink."""
-
-  def Write(self, value):
-    super(TextFileWriter, self).Write(value)
-    if self.sink.append_trailing_newlines:
-      self.file.write('\n')

http://git-wip-us.apache.org/repos/asf/beam/blob/52fc95dd/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index f75bc5d..ad77dc5 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -18,14 +18,11 @@
 
 """Unit tests for local and GCS sources and sinks."""
 
-import bz2
 import glob
-import gzip
 import logging
 import os
 import tempfile
 import unittest
-import zlib
 
 import hamcrest as hc
 import mock
@@ -33,555 +30,12 @@ import mock
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
-from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
-# TODO: Add tests for file patterns (ie not just individual files) for both
-# compressed and uncompressed files.
-
-
-class TestTextFileSource(unittest.TestCase):
-
-  def create_temp_file(self, text, suffix=''):
-    temp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
-    with temp.file as tmp:
-      tmp.write(text)
-    return temp.name
-
-  def read_with_offsets(self,
-                        input_lines,
-                        output_lines,
-                        start_offset=None,
-                        end_offset=None):
-    file_name = self.create_temp_file('\n'.join(input_lines))
-    source = fileio.TextFileSource(
-        file_path=file_name,
-        start_offset=start_offset,
-        end_offset=end_offset)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, output_lines)
-    dd = DisplayData.create_from(source)
-    expected_items = [
-        DisplayDataItemMatcher('file_pattern', file_name),
-        DisplayDataItemMatcher('compression', 'auto')]
-    hc.assert_that(dd.items,
-                   hc.contains_inanyorder(*expected_items))
-
-  def progress_with_offsets(self,
-                            input_lines,
-                            start_offset=None,
-                            end_offset=None):
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file('\n'.join(input_lines)),
-        start_offset=start_offset,
-        end_offset=end_offset)
-    progress_record = []
-    with source.reader() as reader:
-      self.assertEqual(reader.get_progress().position.byte_offset, -1)
-      for line in reader:
-        self.assertIsNotNone(line)
-        progress_record.append(reader.get_progress().position.byte_offset)
-
-    previous = 0
-    for current in progress_record:
-      self.assertGreater(current, previous)
-      previous = current
-
-  def test_read_entire_file(self):
-    lines = ['First', 'Second', 'Third']
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file('\n'.join(lines)))
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_empty(self):
-    source = fileio.TextFileSource(file_path=self.create_temp_file(''))
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_read_entire_file_gzip(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.GZIP)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_gzip_auto(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(file_path=self.create_temp_file(
-        data, suffix='.gz'))
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_gzip_empty(self):
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('') + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.GZIP)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_read_entire_file_gzip_large(self):
-    lines = ['Line %d' % d for d in range(100 * 1000)]
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.GZIP)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_bzip2(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.BZIP2)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_bzip2_auto(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(file_path=self.create_temp_file(
-        data, suffix='.bz2'))
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_bzip2_empty(self):
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('') + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.BZIP2)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_read_entire_file_bzip2_large(self):
-    lines = ['Line %d' % d for d in range(100 * 1000)]
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.BZIP2)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_progress_entire_file(self):
-    lines = ['First', 'Second', 'Third']
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file('\n'.join(lines)))
-    progress_record = []
-    with source.reader() as reader:
-      self.assertEqual(-1, reader.get_progress().position.byte_offset)
-      for line in reader:
-        self.assertIsNotNone(line)
-        progress_record.append(reader.get_progress().position.byte_offset)
-      self.assertEqual(13, reader.get_progress().position.byte_offset)
-
-    self.assertEqual(len(progress_record), 3)
-    self.assertEqual(progress_record, [0, 6, 13])
-
-  def test_progress_entire_file_gzip(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.GZIP)
-    progress_record = []
-    with source.reader() as reader:
-      self.assertEqual(-1, reader.get_progress().position.byte_offset)
-      for line in reader:
-        self.assertIsNotNone(line)
-        progress_record.append(reader.get_progress().position.byte_offset)
-      self.assertEqual(18,  # Reading the entire contents before we decide EOF.
-                       reader.get_progress().position.byte_offset)
-
-    self.assertEqual(len(progress_record), 3)
-    self.assertEqual(progress_record, [0, 6, 13])
-
-  def test_progress_entire_file_bzip2(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.BZIP2)
-    progress_record = []
-    with source.reader() as reader:
-      self.assertEqual(-1, reader.get_progress().position.byte_offset)
-      for line in reader:
-        self.assertIsNotNone(line)
-        progress_record.append(reader.get_progress().position.byte_offset)
-      self.assertEqual(18,  # Reading the entire contents before we decide EOF.
-                       reader.get_progress().position.byte_offset)
-
-    self.assertEqual(len(progress_record), 3)
-    self.assertEqual(progress_record, [0, 6, 13])
-
-  def try_splitting_reader_at(self, reader, split_request, expected_response):
-    actual_response = reader.request_dynamic_split(split_request)
-
-    if expected_response is None:
-      self.assertIsNone(actual_response)
-    else:
-      self.assertIsNotNone(actual_response.stop_position)
-      self.assertIsInstance(actual_response.stop_position,
-                            dataflow_io.ReaderPosition)
-      self.assertIsNotNone(actual_response.stop_position.byte_offset)
-      self.assertEqual(expected_response.stop_position.byte_offset,
-                       actual_response.stop_position.byte_offset)
-
-      return actual_response
-
-  def test_file_unsplittable_gzip(self):
-    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-
-    with self.assertRaises(ValueError):  # Unsplittable initially.
-      source = fileio.TextFileSource(
-          file_path=self.create_temp_file(data),
-          compression_type=fileio.CompressionTypes.GZIP,
-          start_offset=1)  # Anything other than 0 will do.
-      with source.reader():
-        pass
-
-    # Unsplittable dynamically.
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.GZIP)
-
-    with source.reader() as reader:
-      percents_complete = [x / 100.0 for x in range(101)]
-
-      # Cursor at beginning of file.
-      for percent_complete in percents_complete:
-        self.try_splitting_reader_at(
-            reader,
-            dataflow_io.DynamicSplitRequest(
-                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
-            None)
-
-      # Cursor passed beginning of file.
-      reader_iter = iter(reader)
-      next(reader_iter)
-      next(reader_iter)
-      for percent_complete in percents_complete:
-        self.try_splitting_reader_at(
-            reader,
-            dataflow_io.DynamicSplitRequest(
-                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
-            None)
-
-  def test_file_unsplittable_bzip2(self):
-    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-
-    with self.assertRaises(ValueError):  # Unsplittable initially.
-      source = fileio.TextFileSource(
-          file_path=self.create_temp_file(data),
-          compression_type=fileio.CompressionTypes.BZIP2,
-          start_offset=1)  # Anything other than 0 will do.
-      with source.reader():
-        pass
-
-    # Unsplittable dynamically.
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.BZIP2)
-    with source.reader() as reader:
-      percents_complete = [x / 100.0 for x in range(101)]
-
-      # Cursor at beginning of file.
-      for percent_complete in percents_complete:
-        self.try_splitting_reader_at(
-            reader,
-            dataflow_io.DynamicSplitRequest(
-                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
-            None)
-
-      # Cursor passed beginning of file.
-      reader_iter = iter(reader)
-      next(reader_iter)
-      next(reader_iter)
-      for percent_complete in percents_complete:
-        self.try_splitting_reader_at(
-            reader,
-            dataflow_io.DynamicSplitRequest(
-                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
-            None)
-
-  def test_update_stop_position_for_percent_complete(self):
-    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file('\n'.join(lines)))
-    with source.reader() as reader:
-      # Reading two lines
-      reader_iter = iter(reader)
-      next(reader_iter)
-      next(reader_iter)
-      next(reader_iter)
-
-      # Splitting at end of the range should be unsuccessful
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(percent_complete=0)),
-          None)
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(percent_complete=1)),
-          None)
-
-      # Splitting at positions on or before start offset of the last record
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(percent_complete=0.2)),
-          None)
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(percent_complete=0.4)),
-          None)
-
-      # Splitting at a position after the start offset of the last record should
-      # be successful
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(percent_complete=0.6)),
-          dataflow_io.DynamicSplitResultWithPosition(
-              dataflow_io.ReaderPosition(byte_offset=15)))
-
-  def test_update_stop_position_percent_complete_for_position(self):
-    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file('\n'.join(lines)))
-    with source.reader() as reader:
-      # Reading two lines
-      reader_iter = iter(reader)
-      next(reader_iter)
-      next(reader_iter)
-      next(reader_iter)
-
-      # Splitting at end of the range should be unsuccessful
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
-                  byte_offset=0))),
-          None)
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
-                  byte_offset=25))),
-          None)
-
-      # Splitting at positions on or before start offset of the last record
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
-                  byte_offset=5))),
-          None)
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
-                  byte_offset=10))),
-          None)
-
-      # Splitting at a position after the start offset of the last record should
-      # be successful
-      self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(
-              dataflow_io.ReaderProgress(position=dataflow_io.ReaderPosition(
-                  byte_offset=15))),
-          dataflow_io.DynamicSplitResultWithPosition(
-              dataflow_io.ReaderPosition(byte_offset=15)))
-
-  def run_update_stop_position_exhaustive(self, lines, newline):
-    """An exhaustive test for dynamic splitting.
-
-    For the given set of data items, try to perform a split at all possible
-    combinations of following.
-
-    * start position
-    * original stop position
-    * updated stop position
-    * number of items read
-
-    Args:
-      lines: set of data items to be used to create the file
-      newline: separater to be used when writing give set of lines to a text
-        file.
-    """
-
-    file_path = self.create_temp_file(newline.join(lines))
-
-    total_records = len(lines)
-    total_bytes = 0
-
-    for line in lines:
-      total_bytes += len(line)
-    total_bytes += len(newline) * (total_records - 1)
-
-    for start in xrange(0, total_bytes - 1):
-      for end in xrange(start + 1, total_bytes):
-        for stop in xrange(start, end):
-          for records_to_read in range(0, total_records):
-            self.run_update_stop_position(start, end, stop, records_to_read,
-                                          file_path)
-
-  def test_update_stop_position_exhaustive(self):
-    self.run_update_stop_position_exhaustive(
-        ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee'], '\n')
-
-  def test_update_stop_position_exhaustive_with_empty_lines(self):
-    self.run_update_stop_position_exhaustive(
-        ['', 'aaaa', '', 'bbbb', 'cccc', '', 'dddd', 'eeee', ''], '\n')
-
-  def test_update_stop_position_exhaustive_windows_newline(self):
-    self.run_update_stop_position_exhaustive(
-        ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee'], '\r\n')
-
-  def test_update_stop_position_exhaustive_multi_byte(self):
-    self.run_update_stop_position_exhaustive([u'\u0d85\u0d85\u0d85\u0d85'.encode('utf-8'),
-                                              u'\u0db6\u0db6\u0db6\u0db6'.encode('utf-8'),
-                                              u'\u0d9a\u0d9a\u0d9a\u0d9a'.encode('utf-8')], '\n')
-
-  def run_update_stop_position(self, start_offset, end_offset, stop_offset,
-                               records_to_read, file_path):
-    source = fileio.TextFileSource(file_path, start_offset, end_offset)
-
-    records_of_first_split = ''
-
-    with source.reader() as reader:
-      reader_iter = iter(reader)
-      i = 0
-
-      try:
-        while i < records_to_read:
-          records_of_first_split += next(reader_iter)
-          i += 1
-      except StopIteration:
-        # Invalid case, given source does not contain this many records.
-        return
-
-      last_record_start_after_reading = reader.range_tracker.last_record_start
-
-      if stop_offset <= last_record_start_after_reading:
-        expected_split_response = None
-      elif stop_offset == start_offset or stop_offset == end_offset:
-        expected_split_response = None
-      elif records_to_read == 0:
-        expected_split_response = None  # unstarted
-      else:
-        expected_split_response = dataflow_io.DynamicSplitResultWithPosition(
-            stop_position=dataflow_io.ReaderPosition(byte_offset=stop_offset))
-
-      split_response = self.try_splitting_reader_at(
-          reader,
-          dataflow_io.DynamicSplitRequest(progress=dataflow_io.ReaderProgress(
-              dataflow_io.ReaderPosition(byte_offset=stop_offset))),
-          expected_split_response)
-
-      # Reading remaining records from the updated reader.
-      for line in reader:
-        records_of_first_split += line
-
-    if split_response is not None:
-      # Total contents received by reading the two splits should be equal to the
-      # result obtained by reading the original source.
-      records_of_original = ''
-      records_of_second_split = ''
-
-      with source.reader() as original_reader:
-        for line in original_reader:
-          records_of_original += line
-
-      new_source = fileio.TextFileSource(
-          file_path, split_response.stop_position.byte_offset, end_offset)
-      with new_source.reader() as reader:
-        for line in reader:
-          records_of_second_split += line
-
-      self.assertEqual(records_of_original,
-                       records_of_first_split + records_of_second_split)
-
-  def test_various_offset_combination_with_local_file_for_read(self):
-    lines = ['01234', '6789012', '456789012']
-    self.read_with_offsets(lines, lines[1:], start_offset=5)
-    self.read_with_offsets(lines, lines[1:], start_offset=6)
-    self.read_with_offsets(lines, lines[2:], start_offset=7)
-    self.read_with_offsets(lines, lines[1:2], start_offset=5, end_offset=13)
-    self.read_with_offsets(lines, lines[1:2], start_offset=5, end_offset=14)
-    self.read_with_offsets(lines, lines[1:], start_offset=5, end_offset=16)
-    self.read_with_offsets(lines, lines[2:], start_offset=14, end_offset=20)
-    self.read_with_offsets(lines, lines[2:], start_offset=14)
-    self.read_with_offsets(lines, [], start_offset=20, end_offset=20)
-
-  def test_various_offset_combination_with_local_file_for_progress(self):
-    lines = ['01234', '6789012', '456789012']
-    self.progress_with_offsets(lines, start_offset=5)
-    self.progress_with_offsets(lines, start_offset=6)
-    self.progress_with_offsets(lines, start_offset=7)
-    self.progress_with_offsets(lines, start_offset=5, end_offset=13)
-    self.progress_with_offsets(lines, start_offset=5, end_offset=14)
-    self.progress_with_offsets(lines, start_offset=5, end_offset=16)
-    self.progress_with_offsets(lines, start_offset=14, end_offset=20)
-    self.progress_with_offsets(lines, start_offset=14)
-    self.progress_with_offsets(lines, start_offset=20, end_offset=20)
+
+class TestChannelFactory(unittest.TestCase):
 
   @mock.patch('apache_beam.io.fileio.gcsio')
   def test_size_of_files_in_glob_complete(self, *unused_args):
@@ -625,185 +79,6 @@ class TestTextFileSource(unittest.TestCase):
     gcsio_mock.size.assert_called_once_with('gs://bucket/file2')
 
 
-class TestNativeTextFileSink(unittest.TestCase):
-
-  def setUp(self):
-    self.lines = ['Line %d' % d for d in range(100)]
-    self.path = tempfile.NamedTemporaryFile().name
-
-  def _write_lines(self, sink, lines):
-    with sink.writer() as writer:
-      for line in lines:
-        writer.Write(line)
-
-  def test_write_text_file(self):
-    sink = fileio.NativeTextFileSink(self.path)
-    self._write_lines(sink, self.lines)
-
-    with open(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), self.lines)
-
-  def test_text_file_display_data(self):
-    sink = fileio.NativeTextFileSink(self.path)
-    dd = DisplayData.create_from(sink)
-    expected_items = [
-        DisplayDataItemMatcher(
-            'file_pattern',
-            '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
-        DisplayDataItemMatcher(
-            'compression',
-            'auto'),
-        DisplayDataItemMatcher(
-            'shards',
-            0)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_text_file_display_data_suffix(self):
-    sink = fileio.NativeTextFileSink(self.path, file_name_suffix='.pdf')
-    dd = DisplayData.create_from(sink)
-    expected_items = [
-        DisplayDataItemMatcher(
-            'file_pattern',
-            '{}{}{}'.format(self.path, '-SSSSS-of-NNNNN', '.pdf')),
-        DisplayDataItemMatcher(
-            'compression',
-            'auto'),
-        DisplayDataItemMatcher(
-            'shards',
-            0)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_write_text_file_empty(self):
-    sink = fileio.NativeTextFileSink(self.path)
-    self._write_lines(sink, [])
-
-    with open(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), [])
-
-  def test_write_text_gzip_file(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.GZIP)
-    self._write_lines(sink, self.lines)
-
-    with gzip.GzipFile(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), self.lines)
-
-  def test_display_data_gzip_file(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.GZIP)
-    dd = DisplayData.create_from(sink)
-    expected_items = [
-        DisplayDataItemMatcher(
-            'file_pattern',
-            '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
-        DisplayDataItemMatcher(
-            'compression',
-            'gzip'),
-        DisplayDataItemMatcher(
-            'shards',
-            0)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_write_text_gzip_file_auto(self):
-    self.path = tempfile.NamedTemporaryFile(suffix='.gz').name
-    sink = fileio.NativeTextFileSink(self.path)
-    self._write_lines(sink, self.lines)
-
-    with gzip.GzipFile(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), self.lines)
-
-  def test_write_text_gzip_file_empty(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.GZIP)
-    self._write_lines(sink, [])
-
-    with gzip.GzipFile(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), [])
-
-  def test_write_text_bzip2_file(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.BZIP2)
-    self._write_lines(sink, self.lines)
-
-    with bz2.BZ2File(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), self.lines)
-
-  def test_display_data_bzip2_file(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.BZIP2)
-    dd = DisplayData.create_from(sink)
-    expected_items = [
-        DisplayDataItemMatcher(
-            'file_pattern',
-            '{}{}'.format(self.path, '-SSSSS-of-NNNNN')),
-        DisplayDataItemMatcher(
-            'compression',
-            'bzip2'),
-        DisplayDataItemMatcher(
-            'shards',
-            0)]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_write_text_bzip2_file_auto(self):
-    self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
-    sink = fileio.NativeTextFileSink(self.path)
-    self._write_lines(sink, self.lines)
-
-    with bz2.BZ2File(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), self.lines)
-
-  def test_write_text_bzip2_file_empty(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.BZIP2)
-    self._write_lines(sink, [])
-
-    with bz2.BZ2File(self.path, 'r') as f:
-      self.assertEqual(f.read().splitlines(), [])
-
-  def test_write_native(self):
-    pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create(self.lines)
-    pcoll | beam.Write(fileio.NativeTextFileSink(self.path))  # pylint: disable=expression-not-assigned
-    pipeline.run()
-
-    read_result = []
-    for file_name in glob.glob(self.path + '*'):
-      with open(file_name, 'r') as f:
-        read_result.extend(f.read().splitlines())
-
-    self.assertEqual(read_result, self.lines)
-
-  def test_write_native_auto_compression(self):
-    pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create(self.lines)
-    pcoll | beam.Write(  # pylint: disable=expression-not-assigned
-        fileio.NativeTextFileSink(
-            self.path, file_name_suffix='.gz'))
-    pipeline.run()
-
-    read_result = []
-    for file_name in glob.glob(self.path + '*'):
-      with gzip.GzipFile(file_name, 'r') as f:
-        read_result.extend(f.read().splitlines())
-
-    self.assertEqual(read_result, self.lines)
-
-  def test_write_native_auto_compression_unsharded(self):
-    pipeline = TestPipeline()
-    pcoll = pipeline | beam.core.Create(self.lines)
-    pcoll | beam.Write(  # pylint: disable=expression-not-assigned
-        fileio.NativeTextFileSink(
-            self.path + '.gz', shard_name_template=''))
-    pipeline.run()
-
-    read_result = []
-    for file_name in glob.glob(self.path + '*'):
-      with gzip.GzipFile(file_name, 'r') as f:
-        read_result.extend(f.read().splitlines())
-
-    self.assertEqual(read_result, self.lines)
-
-
 class MyFileSink(fileio.FileSink):
 
   def open(self, temp_path):

http://git-wip-us.apache.org/repos/asf/beam/blob/52fc95dd/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index ec2b3a1..e8d8c4c 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -548,11 +548,6 @@ class _NativeWriteEvaluator(_TransformEvaluator):
         # Ignore empty bundles that arrive after the output is produced.
         assert self.state == []
       else:
-        if isinstance(self._sink, io.fileio.NativeTextFileSink):
-          assert self._sink.num_shards in (0, 1)
-          if self._sink.shard_name_template:
-            self._sink.file_path += '-00000-of-00001'
-            self._sink.file_path += self._sink.file_name_suffix
         self._sink.pipeline_options = self._evaluation_context.pipeline_options
         with self._sink.writer() as writer:
           for v in self.state: