You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/09/23 21:21:37 UTC

[GitHub] [beam] MarcoRob opened a new pull request #15572: BEAM-8218 - PulsarIO (not completed, still working on), looking forward for initial feedback

MarcoRob opened a new pull request #15572:
URL: https://github.com/apache/beam/pull/15572


   Initial approach for PulsarIO, still working on SDF. 
   
   Submitting this PR as DRAFT, in order to 
   1 - get feedback about the initial approach for SDF with PulsarIO
   2 - validate if restrictions are good
   3 - discuss about GetSize and Windowing. 
   
   Restrictions for SDF that i am considering are the MessageId which is who PulsarIO manage the offsets. Each MessageId is incremental but not necessarily sequential (e.g. 3066338213929, 3066338213930, 3066338213931, 3208340570112, 3633810767872, 3633810767873) since MessageId is a combinations of id's (in this case entry id and ledger id to make a unique message id). I am converting the MessageId into an offset with Pulsar default utils. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r728438258



##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {

Review comment:
       Yeah, you have a private or package private class that is applied by the `PTransform` expand method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: BEAM-8218 - PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ccca353) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.75%   -0.03%     
   ==========================================
     Files         439      444       +5     
     Lines       59219    60331    +1112     
   ==========================================
   + Hits        49618    50532     +914     
   - Misses       9601     9799     +198     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.37% <0.00%> (-4.66%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.74% <0.00%> (-1.79%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | [.../apache\_beam/io/gcp/datastore/v1new/datastoreio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2RhdGFzdG9yZS92MW5ldy9kYXRhc3RvcmVpby5weQ==) | `86.45% <0.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.53% <0.00%> (-0.85%)` | :arrow_down: |
   | ... and [76 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...ccca353](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on pull request #15572: BEAM-8218 - PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926174077


   R: @alexvanboxel 
   cc: @kileys @fernando-wizeline @MiguelAnzoWizeline 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (96bb873) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **increase** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head 96bb873 differs from pull request most recent head ac4ad75. Consider uploading reports for the commit ac4ad75 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   + Coverage   83.78%   83.80%   +0.02%     
   ==========================================
     Files         439      444       +5     
     Lines       59219    60474    +1255     
   ==========================================
   + Hits        49618    50682    +1064     
   - Misses       9601     9792     +191     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `92.68% <0.00%> (-2.44%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | [.../apache\_beam/io/gcp/datastore/v1new/datastoreio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2RhdGFzdG9yZS92MW5ldy9kYXRhc3RvcmVpby5weQ==) | `86.45% <0.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.56% <0.00%> (-0.82%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `86.73% <0.00%> (-0.76%)` | :arrow_down: |
   | ... and [98 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...ac4ad75](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] devinbost commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
devinbost commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-1020580084






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ccca353) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head ccca353 differs from pull request most recent head 96bb873. Consider uploading reports for the commit 96bb873 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.75%   -0.03%     
   ==========================================
     Files         439      444       +5     
     Lines       59219    60331    +1112     
   ==========================================
   + Hits        49618    50532     +914     
   - Misses       9601     9799     +198     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.37% <0.00%> (-4.66%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.74% <0.00%> (-1.79%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | [.../apache\_beam/io/gcp/datastore/v1new/datastoreio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2RhdGFzdG9yZS92MW5ldy9kYXRhc3RvcmVpby5weQ==) | `86.45% <0.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.53% <0.00%> (-0.85%)` | :arrow_down: |
   | ... and [76 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...96bb873](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (96bb873) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **increase** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   + Coverage   83.78%   83.80%   +0.02%     
   ==========================================
     Files         439      444       +5     
     Lines       59219    60474    +1255     
   ==========================================
   + Hits        49618    50682    +1064     
   - Misses       9601     9792     +191     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `92.68% <0.00%> (-2.44%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | [.../apache\_beam/io/gcp/datastore/v1new/datastoreio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2RhdGFzdG9yZS92MW5ldy9kYXRhc3RvcmVpby5weQ==) | `86.45% <0.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.56% <0.00%> (-0.82%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `86.73% <0.00%> (-0.76%)` | :arrow_down: |
   | ... and [98 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...96bb873](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-949067175


   > Hi @lukecwik , I am working on the watermark estimator, and taking a look to KafkaIO, I see it has two types of watermark (MonotonicallyIncreasing and Manual), for first version of PulsarIO, I was consider only using MonotonicallyIncreasing, I want to know your input on this concern
   
   The purpose of the watermark tracking is to be able to fire certain triggers downstream of this transform instead of waiting till all the output of this transform is produced before being able to advance the watermark. In this case if the timestamp of the record is always equivalent to `currentOffset` then using `MonotonicallyIncreasing` makes sense. You'll also want to use `outputWithTimestamp(record, new Instant(currentOffset))` instead of the existing `output(record)` method.
   
   If the timestamp of the record is from some data associated within the record then it is dependent on the ordering the user provides and you'll want to provide some user some configuration options that allow you to convert a record to a timestamp and also to state how they want the watermark estimation to happen.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ac4ad75) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.27%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head ac4ad75 differs from pull request most recent head 818e954. Consider uploading reports for the commit 818e954 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.51%   -0.28%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61371    +2152     
   ==========================================
   + Hits        49618    51254    +1636     
   - Misses       9601    10117     +516     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `87.80% <0.00%> (-7.32%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...818e954](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (818e954) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.26%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.52%   -0.27%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61385    +2166     
   ==========================================
   + Hits        49618    51272    +1654     
   - Misses       9601    10113     +512     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...818e954](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] devinbost commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
devinbost commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-1020580932


   If you could at least rebase this PR so we can see it compared to the current master, that would help. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] genert commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
genert commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-986313968


   What is the status of the PR? How can I help?
   
   CC @MarcoRob @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-1023547897


   Hi folks,
   
   I am closing this PR since I finished the first version of PulsarIO and have a cleaner PR with commits. Here you can find the new PR created with the latest changes commited, [PR-16634](https://github.com/apache/beam/pull/16634)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob closed pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob closed pull request #15572:
URL: https://github.com/apache/beam/pull/15572


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (818e954) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.26%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.52%   -0.27%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61385    +2166     
   ==========================================
   + Hits        49618    51272    +1654     
   - Misses       9601    10113     +512     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...818e954](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-948840097


   Hi @lukecwik , I am working on the watermark estimator, and taking a look to KafkaIO, I see it has two types of watermark (MonotonicallyIncreasing and Manual), for first version of PulsarIO, I was consider only using MonotonicallyIncreasing, I want to know your input on this concern


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (96bb873) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **increase** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   + Coverage   83.78%   83.80%   +0.02%     
   ==========================================
     Files         439      444       +5     
     Lines       59219    60474    +1255     
   ==========================================
   + Hits        49618    50682    +1064     
   - Misses       9601     9792     +191     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `92.68% <0.00%> (-2.44%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | [.../apache\_beam/io/gcp/datastore/v1new/datastoreio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2RhdGFzdG9yZS92MW5ldy9kYXRhc3RvcmVpby5weQ==) | `86.45% <0.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.56% <0.00%> (-0.82%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `86.73% <0.00%> (-0.76%)` | :arrow_down: |
   | ... and [98 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...96bb873](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926833776


   cc @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-965675292


   Hi @lukecwik I made some changes to PulsarIO, still WIP, I need to work on PulsarIO tests, meanwhile I would like to review the new approach with the suggested changes.
   Some changes made:
   - Watermark
   - PTransform builder
   - Read and Write for IO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (818e954) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.26%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.52%   -0.27%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61385    +2166     
   ==========================================
   + Hits        49618    51272    +1654     
   - Misses       9601    10113     +512     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...818e954](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] devinbost commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
devinbost commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-1020580084


   I'd also like to help. What else needs to be done? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r719730118



##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIOUtils.java
##########
@@ -0,0 +1,6 @@
+package org.apache.beam.sdk.io.pulsar;
+
+public class PulsarIOUtils {

Review comment:
       package private

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();

Review comment:
       I believe this is the default implementation if not overridden as per:
   https://github.com/apache/beam/blob/72595b9018754512181f4ab3f35d0ceac11536cf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L980

##########
File path: settings.gradle.kts
##########
@@ -225,3 +226,5 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner")
+include("sdks:java:io:pulsar")

Review comment:
       ```suggestion
   ```
   
   Duplicate of above.

##########
File path: sdks/java/io/pulsar/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+version '2.32.0-SNAPSHOT'
+
+def pulsarVersion = '2.8.0'
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
+    compile group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: pulsarVersion
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+
+
+    testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
+    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+
+    testImplementation "org.testcontainers:pulsar:1.15.3"
+}
+
+test {
+    useJUnitPlatform()
+}

Review comment:
       nit: missing new line

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSource.java
##########
@@ -0,0 +1,27 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import javax.annotation.Nullable;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSource implements Serializable {
+
+    @SchemaFieldName("start_offset")
+    @Nullable
+    abstract Long getStartOffset();

Review comment:
       You could expand this to contain the serviceUrl/adminUrl/... allowing one to dynamically read from multiple Pulsar sources instead of only one.
   
   This has come up in Kafka where users want to read from 10s or 100s of clusters and specifying each transform individually during pipeline creation is a hassle.

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSource.java
##########
@@ -0,0 +1,27 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import javax.annotation.Nullable;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSource implements Serializable {

Review comment:
       nit: PulsarSource -> PulsarSourceDescriptor

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {

Review comment:
       I understand that this is a WIP but typically we expose a PTransform builder pattern that expands and applies any necessary DoFns. Using transforms allows one to create composite sub-graphs of logic.
   
   See https://beam.apache.org/contribute/ptransform-style-guide/ for more details.

##########
File path: settings.gradle.kts
##########
@@ -225,3 +226,5 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner")
+include("sdks:java:io:pulsar")
+findProject(":sdks:java:io:pulsar")?.name = "pulsar"

Review comment:
       ```suggestion
   ```
   
   You only need to specify project if the project name doesn't match the project path when replacing `:` with `/`.
   

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {
+
+            while (true) {
+                Message<byte[]> message = reader.readNext();
+                MessageId messageId = message.getMessageId();
+                long currentOffset = MessageIdUtils.getOffset(messageId);
+                // if tracker.tryclaim() return true, sdf must execute work otherwise
+                // doFn must exit processElement() without doing any work associated
+                // or claiming more work
+                if (!tracker.tryClaim(currentOffset)) {
+                    return ProcessContinuation.stop();
+                }
+                PulsarRecord<K, V> newPulsarRecord =

Review comment:
       Why not output the Pulsar `Message` itself instead of creating your own type?
   
   The case for Kafka is that when reading from Kafka we don't have all this data about topic/partition/offset when reading and capture local state to generate a richer message but that seems redundant here.

##########
File path: sdks/java/io/pulsar/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+version '2.32.0-SNAPSHOT'

Review comment:
       I don't think this is necessary.
   ```suggestion
   ```

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {

Review comment:
       Can Pulsar change the number of partitions dynamically?
   If not, it seems to make sense that each partition either be part of the PulsarSourceDescriptor or the restriction itself so this way each SDF instance that is executing reads from one and only one partition. This allows runners to read from all the partitions in parallel on different workers allowing for better performance.
   
   Initial splitting can be used to enumerate all the partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on a change in pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r728377024



##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {

Review comment:
       Okey, got it. 
   
   Then I should do something similar like KafkaIO line #1344, right?
   https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1344
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #15572: BEAM-8218 - PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ccca353) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.75%   -0.03%     
   ==========================================
     Files         439      444       +5     
     Lines       59219    60331    +1112     
   ==========================================
   + Hits        49618    50532     +914     
   - Misses       9601     9799     +198     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.37% <0.00%> (-4.66%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.74% <0.00%> (-1.79%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | [.../apache\_beam/io/gcp/datastore/v1new/datastoreio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2RhdGFzdG9yZS92MW5ldy9kYXRhc3RvcmVpby5weQ==) | `86.45% <0.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.53% <0.00%> (-0.85%)` | :arrow_down: |
   | ... and [76 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...ccca353](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ac4ad75) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.27%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.51%   -0.28%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61371    +2152     
   ==========================================
   + Hits        49618    51254    +1636     
   - Misses       9601    10117     +516     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `87.80% <0.00%> (-7.32%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...ac4ad75](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ac4ad75) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.27%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.51%   -0.28%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61371    +2152     
   ==========================================
   + Hits        49618    51254    +1636     
   - Misses       9601    10117     +516     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `87.80% <0.00%> (-7.32%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...ac4ad75](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on a change in pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r728451852



##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {

Review comment:
       You can increase the number of partitions but not decrease it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-965675292


   Hi @lukecwik I made some changes to PulsarIO, still WIP, I need to work on PulsarIO tests, meanwhile I would like to review the new approach with the suggested changes.
   Some changes made:
   - Watermark
   - PTransform builder
   - Read and Write for IO
   
   As well want to validate getSize operation on ReadSDF, I was considering handle something similar like KafkaIO, where keeps tracking on the average size of the records, which can be done similar in PulsarIO


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (818e954) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.26%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.52%   -0.27%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61385    +2166     
   ==========================================
   + Hits        49618    51272    +1654     
   - Misses       9601    10113     +512     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...818e954](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r719730118



##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIOUtils.java
##########
@@ -0,0 +1,6 @@
+package org.apache.beam.sdk.io.pulsar;
+
+public class PulsarIOUtils {

Review comment:
       package private

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();

Review comment:
       I believe this is the default implementation if not overridden as per:
   https://github.com/apache/beam/blob/72595b9018754512181f4ab3f35d0ceac11536cf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L980

##########
File path: settings.gradle.kts
##########
@@ -225,3 +226,5 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner")
+include("sdks:java:io:pulsar")

Review comment:
       ```suggestion
   ```
   
   Duplicate of above.

##########
File path: sdks/java/io/pulsar/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+version '2.32.0-SNAPSHOT'
+
+def pulsarVersion = '2.8.0'
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
+    compile group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: pulsarVersion
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+
+
+    testCompile project(path: ":sdks:java:io:common", configuration: "testRuntime")
+    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
+    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
+
+    testImplementation "org.testcontainers:pulsar:1.15.3"
+}
+
+test {
+    useJUnitPlatform()
+}

Review comment:
       nit: missing new line

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSource.java
##########
@@ -0,0 +1,27 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import javax.annotation.Nullable;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSource implements Serializable {
+
+    @SchemaFieldName("start_offset")
+    @Nullable
+    abstract Long getStartOffset();

Review comment:
       You could expand this to contain the serviceUrl/adminUrl/... allowing one to dynamically read from multiple Pulsar sources instead of only one.
   
   This has come up in Kafka where users want to read from 10s or 100s of clusters and specifying each transform individually during pipeline creation is a hassle.

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSource.java
##########
@@ -0,0 +1,27 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import javax.annotation.Nullable;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class PulsarSource implements Serializable {

Review comment:
       nit: PulsarSource -> PulsarSourceDescriptor

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {

Review comment:
       I understand that this is a WIP but typically we expose a PTransform builder pattern that expands and applies any necessary DoFns. Using transforms allows one to create composite sub-graphs of logic.
   
   See https://beam.apache.org/contribute/ptransform-style-guide/ for more details.

##########
File path: settings.gradle.kts
##########
@@ -225,3 +226,5 @@ include("beam-test-jenkins")
 project(":beam-test-jenkins").projectDir = file(".test-infra/jenkins")
 include("beam-validate-runner")
 project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner")
+include("sdks:java:io:pulsar")
+findProject(":sdks:java:io:pulsar")?.name = "pulsar"

Review comment:
       ```suggestion
   ```
   
   You only need to specify project if the project name doesn't match the project path when replacing `:` with `/`.
   

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {
+
+            while (true) {
+                Message<byte[]> message = reader.readNext();
+                MessageId messageId = message.getMessageId();
+                long currentOffset = MessageIdUtils.getOffset(messageId);
+                // if tracker.tryclaim() return true, sdf must execute work otherwise
+                // doFn must exit processElement() without doing any work associated
+                // or claiming more work
+                if (!tracker.tryClaim(currentOffset)) {
+                    return ProcessContinuation.stop();
+                }
+                PulsarRecord<K, V> newPulsarRecord =

Review comment:
       Why not output the Pulsar `Message` itself instead of creating your own type?
   
   The case for Kafka is that when reading from Kafka we don't have all this data about topic/partition/offset when reading and capture local state to generate a richer message but that seems redundant here.

##########
File path: sdks/java/io/pulsar/build.gradle
##########
@@ -0,0 +1,23 @@
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.io.pulsar')
+
+version '2.32.0-SNAPSHOT'

Review comment:
       I don't think this is necessary.
   ```suggestion
   ```

##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {

Review comment:
       Can Pulsar change the number of partitions dynamically?
   If not, it seems to make sense that each partition either be part of the PulsarSourceDescriptor or the restriction itself so this way each SDF instance that is executing reads from one and only one partition. This allows runners to read from all the partitions in parallel on different workers allowing for better performance.
   
   Initial splitting can be used to enumerate all the partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-989039738


   > What is the status of the PR? How can I help?
   > 
   > CC @MarcoRob @lukecwik
   
   Hi @genert 
   I am working on the tests for this PR, but you can help me taking a look at the initial implementation? Maybe there is something that I am missing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ac4ad75) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.27%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.51%   -0.28%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61371    +2152     
   ==========================================
   + Hits        49618    51254    +1636     
   - Misses       9601    10117     +516     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `87.80% <0.00%> (-7.32%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...ac4ad75](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-926194094


   # [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#15572](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (818e954) into [master](https://codecov.io/gh/apache/beam/commit/90c854e97787c19cd5b94034d37c5319317567a8?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (90c854e) will **decrease** coverage by `0.26%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/15572/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #15572      +/-   ##
   ==========================================
   - Coverage   83.78%   83.52%   -0.27%     
   ==========================================
     Files         439      445       +6     
     Lines       59219    61385    +2166     
   ==========================================
   + Hits        49618    51272    +1654     
   - Misses       9601    10113     +512     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `62.72% <0.00%> (-13.66%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.52% <0.00%> (-5.69%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.50% <0.00%> (-5.23%)` | :arrow_down: |
   | [...thon/apache\_beam/runners/worker/operation\_specs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9uX3NwZWNzLnB5) | `40.67% <0.00%> (-4.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `82.91% <0.00%> (-4.57%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-4.55%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/recording\_manager.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9yZWNvcmRpbmdfbWFuYWdlci5weQ==) | `96.55% <0.00%> (-2.42%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `94.02% <0.00%> (-2.24%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/sideinputs.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zaWRlaW5wdXRzLnB5) | `92.15% <0.00%> (-1.73%)` | :arrow_down: |
   | [sdks/python/apache\_beam/testing/test\_stream.py](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbS5weQ==) | `91.08% <0.00%> (-1.31%)` | :arrow_down: |
   | ... and [112 more](https://codecov.io/gh/apache/beam/pull/15572/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [90c854e...818e954](https://codecov.io/gh/apache/beam/pull/15572?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #15572: [BEAM-8218] PulsarIO (not completed, still working on), looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #15572:
URL: https://github.com/apache/beam/pull/15572#discussion_r728520246



##########
File path: sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java
##########
@@ -0,0 +1,185 @@
+package org.apache.beam.sdk.io.pulsar;
+
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@DoFn.UnboundedPerElement
+public class PulsarIO<K, V> extends DoFn<PulsarSource, PulsarRecord<K,V>> {
+
+    private PulsarClient client;
+    private String topic;
+    private List<String> topics;
+    private PulsarAdmin admin;
+
+    private String clientUrl;
+    private String adminUrl;
+
+
+    public void setServiceUrl(String clientUrl, String adminUrl) throws PulsarClientException {
+       this.clientUrl = clientUrl;
+       this.adminUrl = adminUrl;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    private void initPulsarClients() throws PulsarClientException {
+        if(this.adminUrl == null && this.clientUrl == null) {
+            this.adminUrl = PulsarIOUtils.SERVICE_HTTP_URL;
+            this.clientUrl = PulsarIOUtils.SERVICE_URL;
+        }
+        this.client = PulsarClient.builder()
+                .serviceUrl(clientUrl)
+                .build();
+
+        //TODO fix auth for admin connection
+        boolean tlsAllowInsecureConnection = false;
+        String tlsTrustCertsFilePath = null;
+        this.admin = PulsarAdmin.builder()
+                // .authentication(authPluginClassName,authParams)
+                .serviceHttpUrl(adminUrl)
+                .tlsTrustCertsFilePath(tlsTrustCertsFilePath)
+                .allowTlsInsecureConnection(tlsAllowInsecureConnection)
+                .build();
+    }
+
+    private void closePulsarClients() throws PulsarClientException {
+        this.admin.close();
+        this.client.close();
+    }
+
+    // Open connection to Pulsar clients
+    @Setup
+    public void setup() throws Exception {
+        this.initPulsarClients();
+    }
+    // Close connection to Pulsar clients
+    @Teardown
+    public void teardown() throws Exception {
+        this.closePulsarClients();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction(@Element PulsarSource pulsarSource) {
+        // Reading a topic from starting point with offset 0
+        long startOffset = 0;
+        if(pulsarSource.getStartOffset() != null) {
+            startOffset = pulsarSource.getStartOffset();
+        }
+
+        return new OffsetRange(startOffset, Long.MAX_VALUE);
+    }
+
+    /*
+    It may define a DoFn.GetSize method or ensure that the RestrictionTracker implements
+    RestrictionTracker.HasProgress. Poor auto-scaling of workers and/or splitting may result
+     if size or progress is an inaccurate representation of work.
+     See DoFn.GetSize and RestrictionTracker.HasProgress for further details.
+     */
+    @GetSize
+    public double getSize(@Element PulsarSource pulsarSource, @Restriction OffsetRange offsetRange) {
+        //TODO improve getsize estiamate, check pulsar stats to improve get size estimate
+        // https://pulsar.apache.org/docs/en/admin-api-topics/#get-stats
+        double estimateRecords = restrictionTracker(pulsarSource, offsetRange).getProgress().getWorkRemaining();
+        return estimateRecords;
+    }
+
+    private Reader<byte[]> newReader(PulsarClient client, MessageId startMessageId) throws PulsarClientException {
+        ReaderBuilder<byte[]> builder = client.newReader().topic(topic).startMessageId(startMessageId);
+        return builder.create();
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(
+            @Element PulsarRecord pulsarRecord,
+            RestrictionTracker<OffsetRange, Long> tracker,
+            OutputReceiver<PulsarRecord> output) throws IOException {
+
+        long startOffset = tracker.currentRestriction().getFrom();
+        //long expectedOffset = startOffset;
+        MessageId startMessageId = (startOffset != 0) ?
+                        MessageIdUtils.getMessageId(startOffset) : MessageId.earliest;
+
+        //TODO: if topic is partitioned need to create n readers for n topic-partitions
+        try(Reader<byte[]> reader = newReader(client, startMessageId)) {

Review comment:
       For an initial version it would be best to tell users that they can't change the number of partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] MarcoRob commented on pull request #15572: [BEAM-8218] WIP, PulsarIO, looking forward for initial feedback

Posted by GitBox <gi...@apache.org>.
MarcoRob commented on pull request #15572:
URL: https://github.com/apache/beam/pull/15572#issuecomment-1023547897


   Hi folks,
   
   I am closing this PR since I finished the first version of PulsarIO. Here you can find the new PR created with the latest changes commited, [PR-16634](https://github.com/apache/beam/pull/16634)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org