You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/20 21:06:47 UTC

[GitHub] [beam] pabloem opened a new pull request #13154: Implementing Python Bounded Source Reader DoFn

pabloem opened a new pull request #13154:
URL: https://github.com/apache/beam/pull/13154


   This is valuable for BigQuery repeatedly firing side input.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718942037


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718942527


   > > > I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?
   > > 
   > > 
   > > In this case, we will have a simple DoFn that _starts_ the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually.
   > 
   > I see. It seems like you will use `SDFBoundedSourceReader` in your BQ readAll transform. I think it would be nice to not build anything new directly on top of `BoundedSource` since overall we want deprecate `BoundedSource` in the feature. The bounded sdf wrapper is for helping us to do the migration smoothly. What do you think?
   
   I think that's reasonable. If any improvements are made to ReadAllFromBQ, we can make sure that they are done without relying on BoundedSource then.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13154:
URL: https://github.com/apache/beam/pull/13154#discussion_r513112076



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1618,3 +1628,48 @@ def display_data(self):
         'source': DisplayDataItem(self.source.__class__, label='Read Source'),
         'source_dd': self.source
     }
+
+
+class SDFBoundedSourceReader(PTransform):

Review comment:
       It seems like the major difference between `SDFBoundedSourceWrapper ` and `SDFBoundedSourceReader ` is that `SDFBoundedSourceWrapper` takes the source as construction param where `SDFBoundedSourceReader` takes the source as input element. We could change the implementation of `SDFBoundedSourceWrapper` as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-721402049


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718126843


   > I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?
   
   In this case, we will have a simple DoFn that _starts_ the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718928559


   > > I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation?
   > 
   > In this case, we will have a simple DoFn that _starts_ the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually.
   
   I see. It seems like you will use `SDFBoundedSourceReader` in your BQ readAll transform. I think it would be nice to not build anything new directly on top of `BoundedSource` since overall we want deprecate `BoundedSource` in the feature. The bounded sdf wrapper is for helping us to do the migration smoothly. What do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718953430


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #13154:
URL: https://github.com/apache/beam/pull/13154#discussion_r514457389



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1427,194 +1432,184 @@ def with_completed(self, completed):
         fraction=self._fraction, remaining=self._remaining, completed=completed)
 
 
-class _SDFBoundedSourceWrapper(ptransform.PTransform):
-  """A ``PTransform`` that uses SDF to read from a ``BoundedSource``.
+class _SDFBoundedSourceRestriction(object):
+  """ A restriction wraps SourceBundle and RangeTracker. """
+  def __init__(self, source_bundle, range_tracker=None):
+    self._source_bundle = source_bundle
+    self._range_tracker = range_tracker
 
-  NOTE: This transform can only be used with beam_fn_api enabled.
+  def __reduce__(self):
+    # The instance of RangeTracker shouldn't be serialized.
+    return (self.__class__, (self._source_bundle, ))
+
+  def range_tracker(self):
+    if not self._range_tracker:
+      self._range_tracker = self._source_bundle.source.get_range_tracker(
+          self._source_bundle.start_position, self._source_bundle.stop_position)
+    return self._range_tracker
+
+  def weight(self):
+    return self._source_bundle.weight
+
+  def source(self):
+    return self._source_bundle.source
+
+  def try_split(self, fraction_of_remainder):
+    consumed_fraction = self.range_tracker().fraction_consumed()
+    fraction = (
+        consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
+    position = self.range_tracker().position_at_fraction(fraction)
+    # Need to stash current stop_pos before splitting since
+    # range_tracker.split will update its stop_pos if splits
+    # successfully.
+    stop_pos = self._source_bundle.stop_position
+    split_result = self.range_tracker().try_split(position)
+    if split_result:
+      split_pos, split_fraction = split_result
+      primary_weight = self._source_bundle.weight * split_fraction
+      residual_weight = self._source_bundle.weight - primary_weight
+      # Update self to primary weight and end position.
+      self._source_bundle = SourceBundle(
+          primary_weight,
+          self._source_bundle.source,
+          self._source_bundle.start_position,
+          split_pos)
+      return (
+          self,
+          _SDFBoundedSourceRestriction(
+              SourceBundle(
+                  residual_weight,
+                  self._source_bundle.source,
+                  split_pos,
+                  stop_pos)))
+
+
+class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
+  """An `iobase.RestrictionTracker` implementations for wrapping BoundedSource
+  with SDF. The tracked restriction is a _SDFBoundedSourceRestriction, which
+  wraps SourceBundle and RangeTracker.
+
+  Delegated RangeTracker guarantees synchronization safety.
   """
-  class _SDFBoundedSourceRestriction(object):
-    """ A restriction wraps SourceBundle and RangeTracker. """
-    def __init__(self, source_bundle, range_tracker=None):
-      self._source_bundle = source_bundle
-      self._range_tracker = range_tracker
-
-    def __reduce__(self):
-      # The instance of RangeTracker shouldn't be serialized.
-      return (self.__class__, (self._source_bundle, ))
-
-    def range_tracker(self):
-      if not self._range_tracker:
-        self._range_tracker = self._source_bundle.source.get_range_tracker(
-            self._source_bundle.start_position,
-            self._source_bundle.stop_position)
-      return self._range_tracker
-
-    def weight(self):
-      return self._source_bundle.weight
-
-    def source(self):
-      return self._source_bundle.source
-
-    def try_split(self, fraction_of_remainder):
-      consumed_fraction = self.range_tracker().fraction_consumed()
-      fraction = (
-          consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
-      position = self.range_tracker().position_at_fraction(fraction)
-      # Need to stash current stop_pos before splitting since
-      # range_tracker.split will update its stop_pos if splits
-      # successfully.
-      stop_pos = self._source_bundle.stop_position
-      split_result = self.range_tracker().try_split(position)
-      if split_result:
-        split_pos, split_fraction = split_result
-        primary_weight = self._source_bundle.weight * split_fraction
-        residual_weight = self._source_bundle.weight - primary_weight
-        # Update self to primary weight and end position.
-        self._source_bundle = SourceBundle(
-            primary_weight,
-            self._source_bundle.source,
-            self._source_bundle.start_position,
-            split_pos)
-        return (
-            self,
-            _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction(
-                SourceBundle(
-                    residual_weight,
-                    self._source_bundle.source,
-                    split_pos,
-                    stop_pos)))
-
-  class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
-    """An `iobase.RestrictionTracker` implementations for wrapping BoundedSource
-    with SDF. The tracked restriction is a _SDFBoundedSourceRestriction, which
-    wraps SourceBundle and RangeTracker.
-
-    Delegated RangeTracker guarantees synchronization safety.
-    """
-    def __init__(self, restriction):
-      if not isinstance(restriction,
-                        _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction):
-        raise ValueError(
-            'Initializing SDFBoundedSourceRestrictionTracker'
-            ' requires a _SDFBoundedSourceRestriction')
-      self.restriction = restriction
-
-    def current_progress(self):
-      # type: () -> RestrictionProgress
-      return RestrictionProgress(
-          fraction=self.restriction.range_tracker().fraction_consumed())
-
-    def current_restriction(self):
-      self.restriction.range_tracker()
-      return self.restriction
-
-    def start_pos(self):
-      return self.restriction.range_tracker().start_position()
-
-    def stop_pos(self):
-      return self.restriction.range_tracker().stop_position()
-
-    def try_claim(self, position):
-      return self.restriction.range_tracker().try_claim(position)
-
-    def try_split(self, fraction_of_remainder):
-      return self.restriction.try_split(fraction_of_remainder)
-
-    def check_done(self):
-      return self.restriction.range_tracker().fraction_consumed() >= 1.0
-
-    def is_bounded(self):
-      return True
-
-  class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
-    """A `RestrictionProvider` that is used by SDF for `BoundedSource`."""
-    def __init__(self, source, desired_chunk_size=None):
-      self._source = source
-      self._desired_chunk_size = desired_chunk_size
-
-    def initial_restriction(self, element):
-      # Get initial range_tracker from source
-      range_tracker = self._source.get_range_tracker(None, None)
-      return _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction(
-          SourceBundle(
-              None,
-              self._source,
-              range_tracker.start_position(),
-              range_tracker.stop_position()))
-
-    def create_tracker(self, restriction):
-      return _SDFBoundedSourceWrapper._SDFBoundedSourceRestrictionTracker(
-          restriction)
-
-    def split(self, element, restriction):
-      if self._desired_chunk_size is None:
-        try:
-          estimated_size = self._source.estimate_size()
-        except NotImplementedError:
-          estimated_size = None
-        self._desired_chunk_size = Read.get_desired_chunk_size(estimated_size)
-
-      # Invoke source.split to get initial splitting results.
-      source_bundles = self._source.split(self._desired_chunk_size)
-      for source_bundle in source_bundles:
-        yield _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction(
-            source_bundle)
-
-    def restriction_size(self, element, restriction):
-      return restriction.weight()
-
-    def restriction_coder(self):
-      return coders.DillCoder()
+  def __init__(self, restriction):
+    if not isinstance(restriction, _SDFBoundedSourceRestriction):
+      raise ValueError(
+          'Initializing SDFBoundedSourceRestrictionTracker'
+          ' requires a _SDFBoundedSourceRestriction')
+    self.restriction = restriction
 
-  def __init__(self, source):
-    if not isinstance(source, BoundedSource):
-      raise RuntimeError('SDFBoundedSourceWrapper can only wrap BoundedSource')
-    super(_SDFBoundedSourceWrapper, self).__init__()
-    self.source = source
+  def current_progress(self):
+    # type: () -> RestrictionProgress
+    return RestrictionProgress(
+        fraction=self.restriction.range_tracker().fraction_consumed())
 
-  def _create_sdf_bounded_source_dofn(self):
-    source = self.source
+  def current_restriction(self):
+    self.restriction.range_tracker()
+    return self.restriction
 
-    class SDFBoundedSourceDoFn(core.DoFn):
-      def __init__(self, read_source):
-        self.source = read_source
+  def start_pos(self):
+    return self.restriction.range_tracker().start_position()
+
+  def stop_pos(self):
+    return self.restriction.range_tracker().stop_position()
+
+  def try_claim(self, position):
+    return self.restriction.range_tracker().try_claim(position)
 
-      def display_data(self):
-        return {
-            'source': DisplayDataItem(
-                self.source.__class__, label='Read Source'),
-            'source_dd': self.source
-        }
+  def try_split(self, fraction_of_remainder):
+    return self.restriction.try_split(fraction_of_remainder)
+
+  def check_done(self):
+    return self.restriction.range_tracker().fraction_consumed() >= 1.0
+
+  def is_bounded(self):
+    return True
+
+
+class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
+  """
+  A `RestrictionProvider` that is used by SDF for `BoundedSource`.
+
+  If source is provided, uses it for initializing restriction. Otherwise
+  initializes restriction based on input element that is expected to be of
+  BoundedSource type.
+  """
+  def __init__(self, desired_chunk_size=None):
+    self._desired_chunk_size = desired_chunk_size
+
+  def _check_source(self, src):
+    if src is not None and not isinstance(src, BoundedSource):

Review comment:
       The `src` cannot be `None`, right?

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1427,194 +1432,184 @@ def with_completed(self, completed):
         fraction=self._fraction, remaining=self._remaining, completed=completed)
 
 
-class _SDFBoundedSourceWrapper(ptransform.PTransform):
-  """A ``PTransform`` that uses SDF to read from a ``BoundedSource``.
+class _SDFBoundedSourceRestriction(object):
+  """ A restriction wraps SourceBundle and RangeTracker. """
+  def __init__(self, source_bundle, range_tracker=None):
+    self._source_bundle = source_bundle
+    self._range_tracker = range_tracker
 
-  NOTE: This transform can only be used with beam_fn_api enabled.
+  def __reduce__(self):
+    # The instance of RangeTracker shouldn't be serialized.
+    return (self.__class__, (self._source_bundle, ))
+
+  def range_tracker(self):
+    if not self._range_tracker:
+      self._range_tracker = self._source_bundle.source.get_range_tracker(
+          self._source_bundle.start_position, self._source_bundle.stop_position)
+    return self._range_tracker
+
+  def weight(self):
+    return self._source_bundle.weight
+
+  def source(self):
+    return self._source_bundle.source
+
+  def try_split(self, fraction_of_remainder):
+    consumed_fraction = self.range_tracker().fraction_consumed()
+    fraction = (
+        consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
+    position = self.range_tracker().position_at_fraction(fraction)
+    # Need to stash current stop_pos before splitting since
+    # range_tracker.split will update its stop_pos if splits
+    # successfully.
+    stop_pos = self._source_bundle.stop_position
+    split_result = self.range_tracker().try_split(position)
+    if split_result:
+      split_pos, split_fraction = split_result
+      primary_weight = self._source_bundle.weight * split_fraction
+      residual_weight = self._source_bundle.weight - primary_weight
+      # Update self to primary weight and end position.
+      self._source_bundle = SourceBundle(
+          primary_weight,
+          self._source_bundle.source,
+          self._source_bundle.start_position,
+          split_pos)
+      return (
+          self,
+          _SDFBoundedSourceRestriction(
+              SourceBundle(
+                  residual_weight,
+                  self._source_bundle.source,
+                  split_pos,
+                  stop_pos)))
+
+
+class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
+  """An `iobase.RestrictionTracker` implementations for wrapping BoundedSource
+  with SDF. The tracked restriction is a _SDFBoundedSourceRestriction, which
+  wraps SourceBundle and RangeTracker.
+
+  Delegated RangeTracker guarantees synchronization safety.
   """
-  class _SDFBoundedSourceRestriction(object):
-    """ A restriction wraps SourceBundle and RangeTracker. """
-    def __init__(self, source_bundle, range_tracker=None):
-      self._source_bundle = source_bundle
-      self._range_tracker = range_tracker
-
-    def __reduce__(self):
-      # The instance of RangeTracker shouldn't be serialized.
-      return (self.__class__, (self._source_bundle, ))
-
-    def range_tracker(self):
-      if not self._range_tracker:
-        self._range_tracker = self._source_bundle.source.get_range_tracker(
-            self._source_bundle.start_position,
-            self._source_bundle.stop_position)
-      return self._range_tracker
-
-    def weight(self):
-      return self._source_bundle.weight
-
-    def source(self):
-      return self._source_bundle.source
-
-    def try_split(self, fraction_of_remainder):
-      consumed_fraction = self.range_tracker().fraction_consumed()
-      fraction = (
-          consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
-      position = self.range_tracker().position_at_fraction(fraction)
-      # Need to stash current stop_pos before splitting since
-      # range_tracker.split will update its stop_pos if splits
-      # successfully.
-      stop_pos = self._source_bundle.stop_position
-      split_result = self.range_tracker().try_split(position)
-      if split_result:
-        split_pos, split_fraction = split_result
-        primary_weight = self._source_bundle.weight * split_fraction
-        residual_weight = self._source_bundle.weight - primary_weight
-        # Update self to primary weight and end position.
-        self._source_bundle = SourceBundle(
-            primary_weight,
-            self._source_bundle.source,
-            self._source_bundle.start_position,
-            split_pos)
-        return (
-            self,
-            _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction(
-                SourceBundle(
-                    residual_weight,
-                    self._source_bundle.source,
-                    split_pos,
-                    stop_pos)))
-
-  class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
-    """An `iobase.RestrictionTracker` implementations for wrapping BoundedSource
-    with SDF. The tracked restriction is a _SDFBoundedSourceRestriction, which
-    wraps SourceBundle and RangeTracker.
-
-    Delegated RangeTracker guarantees synchronization safety.
-    """
-    def __init__(self, restriction):
-      if not isinstance(restriction,
-                        _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction):
-        raise ValueError(
-            'Initializing SDFBoundedSourceRestrictionTracker'
-            ' requires a _SDFBoundedSourceRestriction')
-      self.restriction = restriction
-
-    def current_progress(self):
-      # type: () -> RestrictionProgress
-      return RestrictionProgress(
-          fraction=self.restriction.range_tracker().fraction_consumed())
-
-    def current_restriction(self):
-      self.restriction.range_tracker()
-      return self.restriction
-
-    def start_pos(self):
-      return self.restriction.range_tracker().start_position()
-
-    def stop_pos(self):
-      return self.restriction.range_tracker().stop_position()
-
-    def try_claim(self, position):
-      return self.restriction.range_tracker().try_claim(position)
-
-    def try_split(self, fraction_of_remainder):
-      return self.restriction.try_split(fraction_of_remainder)
-
-    def check_done(self):
-      return self.restriction.range_tracker().fraction_consumed() >= 1.0
-
-    def is_bounded(self):
-      return True
-
-  class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
-    """A `RestrictionProvider` that is used by SDF for `BoundedSource`."""
-    def __init__(self, source, desired_chunk_size=None):
-      self._source = source
-      self._desired_chunk_size = desired_chunk_size
-
-    def initial_restriction(self, element):
-      # Get initial range_tracker from source
-      range_tracker = self._source.get_range_tracker(None, None)
-      return _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction(
-          SourceBundle(
-              None,
-              self._source,
-              range_tracker.start_position(),
-              range_tracker.stop_position()))
-
-    def create_tracker(self, restriction):
-      return _SDFBoundedSourceWrapper._SDFBoundedSourceRestrictionTracker(
-          restriction)
-
-    def split(self, element, restriction):
-      if self._desired_chunk_size is None:
-        try:
-          estimated_size = self._source.estimate_size()
-        except NotImplementedError:
-          estimated_size = None
-        self._desired_chunk_size = Read.get_desired_chunk_size(estimated_size)
-
-      # Invoke source.split to get initial splitting results.
-      source_bundles = self._source.split(self._desired_chunk_size)
-      for source_bundle in source_bundles:
-        yield _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction(
-            source_bundle)
-
-    def restriction_size(self, element, restriction):
-      return restriction.weight()
-
-    def restriction_coder(self):
-      return coders.DillCoder()
+  def __init__(self, restriction):
+    if not isinstance(restriction, _SDFBoundedSourceRestriction):
+      raise ValueError(
+          'Initializing SDFBoundedSourceRestrictionTracker'
+          ' requires a _SDFBoundedSourceRestriction')
+    self.restriction = restriction
 
-  def __init__(self, source):
-    if not isinstance(source, BoundedSource):
-      raise RuntimeError('SDFBoundedSourceWrapper can only wrap BoundedSource')
-    super(_SDFBoundedSourceWrapper, self).__init__()
-    self.source = source
+  def current_progress(self):
+    # type: () -> RestrictionProgress
+    return RestrictionProgress(
+        fraction=self.restriction.range_tracker().fraction_consumed())
 
-  def _create_sdf_bounded_source_dofn(self):
-    source = self.source
+  def current_restriction(self):
+    self.restriction.range_tracker()
+    return self.restriction
 
-    class SDFBoundedSourceDoFn(core.DoFn):
-      def __init__(self, read_source):
-        self.source = read_source
+  def start_pos(self):
+    return self.restriction.range_tracker().start_position()
+
+  def stop_pos(self):
+    return self.restriction.range_tracker().stop_position()
+
+  def try_claim(self, position):
+    return self.restriction.range_tracker().try_claim(position)
 
-      def display_data(self):
-        return {
-            'source': DisplayDataItem(
-                self.source.__class__, label='Read Source'),
-            'source_dd': self.source
-        }
+  def try_split(self, fraction_of_remainder):
+    return self.restriction.try_split(fraction_of_remainder)
+
+  def check_done(self):
+    return self.restriction.range_tracker().fraction_consumed() >= 1.0
+
+  def is_bounded(self):
+    return True
+
+
+class _SDFBoundedSourceRestrictionProvider(core.RestrictionProvider):
+  """
+  A `RestrictionProvider` that is used by SDF for `BoundedSource`.
+
+  If source is provided, uses it for initializing restriction. Otherwise

Review comment:
       It seems like we also need to update pydoc here as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-716029832


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-713176902


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #13154:
URL: https://github.com/apache/beam/pull/13154


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem closed pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem closed pull request #13154:
URL: https://github.com/apache/beam/pull/13154


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-718126946


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13154:
URL: https://github.com/apache/beam/pull/13154#issuecomment-716029874


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13154: Implementing Python Bounded Source Reader DoFn

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13154:
URL: https://github.com/apache/beam/pull/13154#discussion_r513670976



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1618,3 +1628,48 @@ def display_data(self):
         'source': DisplayDataItem(self.source.__class__, label='Read Source'),
         'source_dd': self.source
     }
+
+
+class SDFBoundedSourceReader(PTransform):

Review comment:
       I've done this - but I've still allowed the source to come in via the constructor as well as as an input. The intention of doing this is to keep the display data for simple Read transforms where the source is known at construction time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org