You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/20 17:29:06 UTC

[GitHub] [beam] Ardagan opened a new pull request #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead

Ardagan opened a new pull request #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead
URL: https://github.com/apache/beam/pull/11182
 
 
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead
URL: https://github.com/apache/beam/pull/11182#issuecomment-604668038
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408192891
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
 
 Review comment:
   I think this should either be "You can read... from distinct windows" or "You can write... to distinct windows." I'm not sure what it means to "read... into distinct windows"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605283633
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r405089339
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+    '''
+    PeriodicSequence transform receives tuple elements with three parts:
+
+    * first_timestamp = first timestamp to output element for.
+    * last_timestamp = last timestamp/time to output element for.
+    * fire_interval = how often to fire an element.
+
+    For each input element received, PeriodicSequence transform will start
+    generating output elements in following pattern:
+
+    * if element timestamp is less than current runtime then output element.
+    * if element timestamp is greater than current runtime, wait until next
+      element timestamp.
+
+    PeriodicSequence can't guarantee that each element is output at exact time.
+    PeriodicSequence guarantees that elements would not be output prior to given
+    runtime timestamp.
+
+    :param element: (start_timestamp, end_timestamp, interval)
+    :param restriction_tracker:
+    :return: yields elements at processing real-time intervals with value of
+      target output timestamp for the element.
+    '''
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
+
+    restriction_tracker.try_claim(current_timestamp)
+    if current_timestamp <= t:
+      if restriction_tracker.try_claim(current_timestamp + interval):
+        current_timestamp += interval
+        yield current_timestamp
+
+    if current_timestamp + interval >= cr.stop:
+      restriction_tracker.try_claim(cr.stop)
+    else:
+      restriction_tracker.defer_remainder(
+          timestamp.Timestamp(current_timestamp))
+
+
+class PeriodicSequence(PTransform):
+  """
+  See ImpulseSeqGenDoFn.
 
 Review comment:
   Sorry about the back and forth. I think the top-level PTransform (which users will rely on) - should have Pydocs, instead of referencing the DoFn.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r405089392
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+    '''
+    PeriodicSequence transform receives tuple elements with three parts:
+
+    * first_timestamp = first timestamp to output element for.
+    * last_timestamp = last timestamp/time to output element for.
+    * fire_interval = how often to fire an element.
+
+    For each input element received, PeriodicSequence transform will start
+    generating output elements in following pattern:
+
+    * if element timestamp is less than current runtime then output element.
+    * if element timestamp is greater than current runtime, wait until next
+      element timestamp.
+
+    PeriodicSequence can't guarantee that each element is output at exact time.
+    PeriodicSequence guarantees that elements would not be output prior to given
+    runtime timestamp.
+
+    :param element: (start_timestamp, end_timestamp, interval)
+    :param restriction_tracker:
+    :return: yields elements at processing real-time intervals with value of
+      target output timestamp for the element.
+    '''
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
+
+    restriction_tracker.try_claim(current_timestamp)
+    if current_timestamp <= t:
+      if restriction_tracker.try_claim(current_timestamp + interval):
+        current_timestamp += interval
+        yield current_timestamp
+
+    if current_timestamp + interval >= cr.stop:
+      restriction_tracker.try_claim(cr.stop)
+    else:
+      restriction_tracker.defer_remainder(
+          timestamp.Timestamp(current_timestamp))
+
+
+class PeriodicSequence(PTransform):
+  """
+  See ImpulseSeqGenDoFn.
+  """
+  def __init_(self):
+    pass
+
+  def expand(self, pbegin):
+    return (
+        pbegin
+        | 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn())
+        | 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
+
+
+class PeriodicImpulse(PTransform):
+  """
+  See ImpulseSeqGenDoFn.
 
 Review comment:
   Sorry about the back and forth. I think the top-level PTransform (which users will rely on) - should have Pydocs, instead of referencing the DoFn.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r401856286
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
 
 Review comment:
   Can you add comments in this method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rezarokni commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
rezarokni commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-610106616
 
 
   What is the expected behaviours around lifecycle events for runners that support drain / update. Does it need to be explicitly documented?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan removed a comment on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan removed a comment on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605130477
 
 
   Run Portable_Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r401877890
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence_test.py
 ##########
 @@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the PTransform and descendants."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import os
+import tempfile
+import time
+import unittest
+from builtins import range
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.periodicsequence import PeriodicSequence
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+# Disable frequent lint warning due to pipe operator for chaining transforms.
+# pylint: disable=expression-not-assigned
+
+
+class PeriodicSequenceTest(unittest.TestCase):
+  # Enable nose tests running in parallel
+
+  def test_heartbeat_outputs_valid_sequence(self):
 
 Review comment:
   I feel silly but how long does this test take to run? 3 seconds? Fractions of a second?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan removed a comment on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan removed a comment on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605161735
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r401866095
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
 
 Review comment:
   nit: would this be something like `last_fired_element`, and `current_timestamp = time.time()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408198261
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
+
+To do this, you can utilize PeriodicSequence PTransform that will generate infinite sequence
 
 Review comment:
   It's probably best to replace "To do this" with "To do ABC" - it'll help the reader figure out what "this" refers to. Otherwise, if the reader is just skimming the page and starts at this paragraph, they have to read the previous paragraph to figure out what the pronoun refers to.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605130477
 
 
   Run Portable_Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan merged pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan merged pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-613237708
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408337414
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
+
+To do this, you can utilize PeriodicSequence PTransform that will generate infinite sequence
+of elements with some real-time period:
+
+1. Use the PeriodicImpulse transform to genearate windowed periodic sequence.
+
+    a. MAX_TIMESTAMP can be replaced with some closer boundary if you want to stop generating elements at some point.
+
+1. Read data using Read operation triggered by arrival of PCollection element.
+
+1. Apply side input.
+
+```python
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:SideInputSlowUpdateSnip1
+%}
 
 Review comment:
   Addressed here: https://github.com/apache/beam/pull/11415

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605161735
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r405088559
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+    '''
+    PeriodicSequence transform receives tuple elements with three parts:
+
+    * first_timestamp = first timestamp to output element for.
+    * last_timestamp = last timestamp/time to output element for.
+    * fire_interval = how often to fire an element.
+
+    For each input element received, PeriodicSequence transform will start
+    generating output elements in following pattern:
+
+    * if element timestamp is less than current runtime then output element.
+    * if element timestamp is greater than current runtime, wait until next
+      element timestamp.
+
+    PeriodicSequence can't guarantee that each element is output at exact time.
+    PeriodicSequence guarantees that elements would not be output prior to given
+    runtime timestamp.
+
+    :param element: (start_timestamp, end_timestamp, interval)
+    :param restriction_tracker:
+    :return: yields elements at processing real-time intervals with value of
+      target output timestamp for the element.
+    '''
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
+
+    restriction_tracker.try_claim(current_timestamp)
+    if current_timestamp <= t:
+      if restriction_tracker.try_claim(current_timestamp + interval):
+        current_timestamp += interval
+        yield current_timestamp
+
+    if current_timestamp + interval >= cr.stop:
+      restriction_tracker.try_claim(cr.stop)
+    else:
+      restriction_tracker.defer_remainder(
+          timestamp.Timestamp(current_timestamp))
+
+
+class PeriodicSequence(PTransform):
+  """
+  See ImpulseSeqGenDoFn.
+  """
+  def __init_(self):
+    pass
+
+  def expand(self, pbegin):
+    return (
+        pbegin
 
 Review comment:
   Renaming to pcoll, as the input is an actual Pcollection, right?
   
   ```suggestion
     def expand(self, pcoll):
       return (
           pcoll
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605213297
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408196632
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
 
 Review comment:
   This might need additional context. What does the term "side input consistency" mean?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408198694
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
+
+To do this, you can utilize PeriodicSequence PTransform that will generate infinite sequence
+of elements with some real-time period:
+
+1. Use the PeriodicImpulse transform to generate windowed periodic sequence.
+
+    a. MAX_TIMESTAMP can be replaced with some closer boundary if you want to stop generating elements at some point.
 
 Review comment:
   MAX_TIMESTAMP is a parameter, right? If so, let's put it in code font. I think "PeriodicImpulse" in the list item above and "Read" (from "Read operation) in the list item below should also be in code font. Putting "Read" in code font will help distinguish it from the word "read," which is also in the sentence.
   
   We can also simplify this sentence a bit: "To stop generating elements, replace MAX_TIMESTAMP with some closer boundary" or "You can replace MAX_TIMESTAMP with..." or something like that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-610150038
 
 
   > 
   > 
   > What is the expected behaviours around lifecycle events for runners that support drain / update. Does it need to be explicitly documented?
   
   Processing will stop on drain. So it should not cause any issues.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r405090794
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
+
+    restriction_tracker.try_claim(current_timestamp)
 
 Review comment:
   fair enough!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408193130
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
 
 Review comment:
   "PCollection" (capital P and C)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r405090182
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence_test.py
 ##########
 @@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the PTransform and descendants."""
 
 Review comment:
   Let's make the tests in this file run as fast as possible while still validating the transform

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r404420904
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
+
+    restriction_tracker.try_claim(current_timestamp)
 
 Review comment:
   This guarantees that left part of deferring is treated as complete.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan removed a comment on issue #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead

Posted by GitBox <gi...@apache.org>.
Ardagan removed a comment on issue #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead
URL: https://github.com/apache/beam/pull/11182#issuecomment-604668038
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605129956
 
 
   R: @rezarokni 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r402623188
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence_test.py
 ##########
 @@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the PTransform and descendants."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import os
+import tempfile
+import time
+import unittest
+from builtins import range
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.periodicsequence import PeriodicSequence
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
+
+# Disable frequent lint warning due to pipe operator for chaining transforms.
+# pylint: disable=expression-not-assigned
+
+
+class PeriodicSequenceTest(unittest.TestCase):
+  # Enable nose tests running in parallel
+
+  def test_heartbeat_outputs_valid_sequence(self):
 
 Review comment:
   This one runs for 3 seconds unfortunately due to delays.
   Taking a second thought, we can set fractional duration/intervals and it should still work.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan commented on issue #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead

Posted by GitBox <gi...@apache.org>.
Ardagan commented on issue #11182: [DO NOT REVIEW] Heartbeat Transform and MultipleRead
URL: https://github.com/apache/beam/pull/11182#issuecomment-604825147
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408195934
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
 
 Review comment:
   Missing "the" -> "when you apply the side input to your main input"
   
   Present tense -> "windows are automatically matched"
   
   It might be less ambiguously to say "each side input window is matched to a main input window" (or something like that), instead of "windows will be matched automatically 1:1." I'm not sure it's clear what it means to be "matched automatically 1:1."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r401878484
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
+
+To do this, you can utilize Heartbeat PTransform that will generate infinite sequence of elements
 
 Review comment:
   Correct naming of the transform

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r405090695
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
+
+To do this, you can utilize PeriodicSequence PTransform that will generate infinite sequence
+of elements with some real-time period:
+
+1. Use the PeriodicImpulse transform to genearate windowed periodic sequence.
+
+    a. MAX_TIMESTAMP can be replaced with some closer boundary if you want to stop generating elements at some point.
+
+1. Read data using Read operation triggered by arrival of PCollection element.
+
+1. Apply side input.
+
+```python
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:SideInputSlowUpdateSnip1
+%}
 
 Review comment:
   I feel that it may be best to get a tech writer review for this. @rosetn @soyrice can one of you validate this doc section?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r401885466
 
 

 ##########
 File path: sdks/python/apache_beam/transforms/periodicsequence.py
 ##########
 @@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import time
+
+import apache_beam as beam
+import apache_beam.runners.sdf_utils as sdf_utils
+from apache_beam.io.restriction_trackers import OffsetRange
+from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.transforms import core
+from apache_beam.transforms import window
+from apache_beam.transforms.ptransform import PTransform
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import MAX_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+
+
+class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
+  def initial_restriction(self, element):
+    start, end, interval = element
+    return OffsetRange(start - interval, end)
+
+  def create_tracker(self, restriction):
+    return ImpulseSeqGenRestrictionTracker(restriction)
+
+  def restriction_size(self, unused_element, restriction):
+    return restriction.size()
+
+
+class ImpulseSeqGenRestrictionTracker(OffsetRestrictionTracker):
+  def try_split(self, fraction_of_remainder):
+    if not self._checkpointed:
+      if fraction_of_remainder != 0:
+        return None
+
+      if self._current_position is None:
+        cur = self._range.start
+      else:
+        cur = self._current_position
+      split_point = cur
+
+      if split_point < self._range.stop:
+        self._checkpointed = True
+        self._range, residual_range = self._range.split_at(split_point)
+        return self._range, residual_range
+
+  def cur_pos(self):
+    return self._current_position
+
+  def try_claim(self, pos):
+    if ((self._last_claim_attempt is None) or
+        (pos > self._last_claim_attempt and pos == self._range.stop)):
+      self._last_claim_attempt = pos
+      return True
+    else:
+      return super(ImpulseSeqGenRestrictionTracker, self).try_claim(pos)
+
+
+class ImpulseSeqGenDoFn(beam.DoFn):
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider())):
+
+    _, _, interval = element
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    t = time.time()
+    cr = restriction_tracker.current_restriction()
+    current_timestamp = cr.start
+
+    restriction_tracker.try_claim(current_timestamp)
 
 Review comment:
   If you defined your initial restriction as `OffsetRange(start, end)`, could you avoid this extra call to `try_claim`?
   You would just do `if restriction_tracker.try_claim(current_timestamp): yield current_timestamp`, and defer the remainder to `current_timestamp+interval`.
   
   I may be missing something - the logic is a little convoluted. LMK if I'm going off the wrong path.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] Ardagan removed a comment on issue #11182: Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
Ardagan removed a comment on issue #11182: Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-605213297
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11182: [BEAM-9650] Add Heartbeat Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-607399989
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#issuecomment-611209492
 
 
   LGTM.
   It would be good to get a tech writer editor for the documentation. Thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation

Posted by GitBox <gi...@apache.org>.
soyrice commented on a change in pull request #11182: [BEAM-9650] Add PeriodicImpulse Transform and slowly changing side input documentation
URL: https://github.com/apache/beam/pull/11182#discussion_r408191424
 
 

 ##########
 File path: website/src/documentation/patterns/side-inputs.md
 ##########
 @@ -45,4 +45,30 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma
 ```java
 {% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:SideInputPatternSlowUpdateGlobalWindowSnip1
 %}
-```
\ No newline at end of file
+```
+
+
+## Slowly updating side input using windowing
+
+You can read side input pcollection periodically into distinct windows.
+Later, when you apply side input to your main input, windows will be matched automatically 1:1.
+This way, you can guarantee side input consistency on the duration of the single window.
+
+To do this, you can utilize PeriodicSequence PTransform that will generate infinite sequence
+of elements with some real-time period:
+
+1. Use the PeriodicImpulse transform to genearate windowed periodic sequence.
+
+    a. MAX_TIMESTAMP can be replaced with some closer boundary if you want to stop generating elements at some point.
+
+1. Read data using Read operation triggered by arrival of PCollection element.
+
+1. Apply side input.
+
+```python
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:SideInputSlowUpdateSnip1
+%}
 
 Review comment:
   I have time today/tomorrow to review but I see that the PR was merged. I'll still leave some comments here. Can we revert/roll back, address the comments, and then merge again

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services