You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "AMOOOMA (via GitHub)" <gi...@apache.org> on 2023/04/28 20:22:39 UTC

[GitHub] [beam] AMOOOMA opened a new pull request, #26477: Report source bytes processed for custom sources

AMOOOMA opened a new pull request, #26477:
URL: https://github.com/apache/beam/pull/26477

   Changes needed for internal project on load balancing.
   Source bytes processed will be passed to load balancing for per range level aggregation and this is custom source only.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26477: Report source bytes processed for custom sources

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26477:
URL: https://github.com/apache/beam/pull/26477#issuecomment-1531933145

   No reviewers could be found from any of the labels on the PR or in the fallback reviewers list. Check the config file to make sure reviewers are configured


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #26477: Report source bytes processed for custom sources

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on PR #26477:
URL: https://github.com/apache/beam/pull/26477#issuecomment-1531953771

   I guess that leaves me :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1182907489


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+              .getReadOperation()
+              .receivers[0]
+              .getOutputCounters();
+      for (ElementCounter counter : counters) {
+        try {
+          sourceBytesProcessed =
+              (long) ((OutputObjectAndByteCounter) counter).getByteCount().getAndReset();
+        } catch (Exception e) {
+          // ignore

Review Comment:
   make the comment explain why we are ignoring it



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+              .getReadOperation()
+              .receivers[0]
+              .getOutputCounters();
+      for (ElementCounter counter : counters) {
+        try {
+          sourceBytesProcessed =
+              (long) ((OutputObjectAndByteCounter) counter).getByteCount().getAndReset();

Review Comment:
   overwrite it each time? you expect just one counter to not crash?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())

Review Comment:
   Are there other kinds of work executors? It has been a while since I touched this code but seeing this unguarded cast makes me need to page it in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AMOOOMA commented on pull request #26477: Report source bytes processed for custom sources

Posted by "AMOOOMA (via GitHub)" <gi...@apache.org>.
AMOOOMA commented on PR #26477:
URL: https://github.com/apache/beam/pull/26477#issuecomment-1531931781

   assign set of reviewers
   
   Macos runner test failed because of some env setup issue, not related to PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles merged pull request #26477: Report source bytes processed for custom sources

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles merged PR #26477:
URL: https://github.com/apache/beam/pull/26477


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26477: Report source bytes processed for custom sources

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26477:
URL: https://github.com/apache/beam/pull/26477#issuecomment-1531918668

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "kennknowles (via GitHub)" <gi...@apache.org>.
kennknowles commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1185260230


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1402,31 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      try {
+        long sourceBytesProcessed = 0;
+        List<ElementCounter> counters =
+            ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+                .getReadOperation()
+                .receivers[0]
+                .getOutputCounters();
+        for (ElementCounter counter : counters) {
+          try {
+            Counter<Long, Long> baseCounter = ((OutputObjectAndByteCounter) counter).getByteCount();
+            if (!baseCounter.getName().name().equals(counterName)) continue;
+            sourceBytesProcessed = (long) baseCounter.getAndReset();
+          } catch (Exception e) {
+            // Ignoring because most counter will crash, spamming the logs.

Review Comment:
   What kind of crash? Can you do this without catching all exceptions? For example when looping over counters, just use an `if` statement to check if the counter is the one we want. Or even better would be to put the counters into a map so you can try to fetch the one you are interested in. Or it should be safe to downcast to a dataflow-specific `OutputReceiver` and it can have a specific method for the counter you are interested in.
   
   Overall I just want to avoid this pattern of intentionally throwing an exception, and avoid "catch every exception" blocks. You never know what exceptions might be added in the future and they shouldn't all be caught, except when that is the specific design goal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AMOOOMA commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "AMOOOMA (via GitHub)" <gi...@apache.org>.
AMOOOMA commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1182951860


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())

Review Comment:
   Yes, there should be only one MapTaskExecutor, double checked by searching from extends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AMOOOMA commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "AMOOOMA (via GitHub)" <gi...@apache.org>.
AMOOOMA commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1182950224


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+              .getReadOperation()
+              .receivers[0]
+              .getOutputCounters();
+      for (ElementCounter counter : counters) {
+        try {
+          sourceBytesProcessed =
+              (long) ((OutputObjectAndByteCounter) counter).getByteCount().getAndReset();

Review Comment:
   Added a check for counter name to make sure we are getting and resetting the counter in mind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AMOOOMA commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "AMOOOMA (via GitHub)" <gi...@apache.org>.
AMOOOMA commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1182969915


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())

Review Comment:
   Added a try catch clause with error message just in case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AMOOOMA commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "AMOOOMA (via GitHub)" <gi...@apache.org>.
AMOOOMA commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1185321950


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1402,31 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      try {
+        long sourceBytesProcessed = 0;
+        List<ElementCounter> counters =
+            ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+                .getReadOperation()
+                .receivers[0]
+                .getOutputCounters();
+        for (ElementCounter counter : counters) {
+          try {
+            Counter<Long, Long> baseCounter = ((OutputObjectAndByteCounter) counter).getByteCount();
+            if (!baseCounter.getName().name().equals(counterName)) continue;
+            sourceBytesProcessed = (long) baseCounter.getAndReset();
+          } catch (Exception e) {
+            // Ignoring because most counter will crash, spamming the logs.

Review Comment:
   Yeah that makes sense. Updated OutputReceiver to have a map instead of List for easier access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] AMOOOMA commented on a diff in pull request #26477: Report source bytes processed for custom sources

Posted by "AMOOOMA (via GitHub)" <gi...@apache.org>.
AMOOOMA commented on code in PR #26477:
URL: https://github.com/apache/beam/pull/26477#discussion_r1182949626


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1400,6 +1401,23 @@ public void close() {
       // Blocks while executing work.
       executionState.getWorkExecutor().execute();
 
+      // Reports source bytes processed to workitemcommitrequest if available.
+      long sourceBytesProcessed = 0;
+      List<ElementCounter> counters =
+          ((DataflowMapTaskExecutor) executionState.getWorkExecutor())
+              .getReadOperation()
+              .receivers[0]
+              .getOutputCounters();
+      for (ElementCounter counter : counters) {
+        try {
+          sourceBytesProcessed =
+              (long) ((OutputObjectAndByteCounter) counter).getByteCount().getAndReset();
+        } catch (Exception e) {
+          // ignore

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org