You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/14 00:07:15 UTC

[GitHub] [beam] nehsyc opened a new pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

nehsyc opened a new pull request #12578:
URL: https://github.com/apache/beam/pull/12578


   This is to support GroupIntoBatches transform with runner determined sharding in Dataflow. To mitigate the limited parallelism introduced by an implicit grouping on keys, we allow the states/timers associated with each input key to be shardable.
   
   This PR modifies the state and work caching in the Dataflow Java runner such that the states of different shards of keys are tracked separately, to prepare it for shardable GroupIntoBatches.
   
   See more details in https://s.apache.org/sharded-group-into-batches.
   
   R: @boyuanzz
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r472389332



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -357,7 +358,7 @@ public void invalidateCache() {
         }
       }
       activeReader = null;
-      stateCache.invalidate(key);
+      stateCache.invalidate(key, getWork().getShardingKey());

Review comment:
       Good catch! Fixed.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -2193,7 +2235,7 @@ public boolean activateWork(ByteString key, Work work) {
     }
 
     /** Marks the work for a the given key as complete. Schedules queued work for the key if any. */
-    public void completeWork(ByteString key, long workToken) {
+    public void completeWork(ShardedKey key, long workToken) {

Review comment:
       Done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -2167,7 +2209,7 @@ public MapTask getMapTask() {
     }
 
     /** Mark the given key and work as active. */
-    public boolean activateWork(ByteString key, Work work) {
+    public boolean activateWork(ShardedKey key, Work work) {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r474844750



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -2205,20 +2238,18 @@ public void completeWork(ByteString key, long workToken) {
           throw new NullPointerException(
               String.format(
                   "No active state for key %s, expected token %s",
-                  TextFormat.escapeBytes(key), workToken));
+                  shardedKey.toString(), workToken));
         }
         if (completedWork.getWorkItem().getWorkToken() != workToken) {
           throw new IllegalStateException(
               String.format(
                   "Token mismatch for key %s: %s and %s",
-                  TextFormat.escapeBytes(key),
-                  completedWork.getWorkItem().getWorkToken(),
-                  workToken));
+                  shardedKey.toString(), completedWork.getWorkItem().getWorkToken(), workToken));

Review comment:
       ```suggestion
                     shardedKey, completedWork.getWorkItem().getWorkToken(), workToken));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-675044503


   Please delete unused file `stale_outputs_checked`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-673769264


   R: @boyuanzz


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-678413747


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-676798983


   Not sure how much we want to invest in clean-up here since there are data structures which are effective copies and the methods should take an object that represents the computation and key instead of requiring the explicit parameters to be exploded out. We would likely get a good savings on memory usage since we would be passing around a single object in a lot of these places and we would also save some CPU since we aren't creating/destroying the objects all over the place.
   MetricTrackingWindmillServerStub.KeyAndComputation == ReaderCache.CacheKey == WindmillStateCache.ComputationKey
   There are some other types which are sub/super types of the one above.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r471805132



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -357,7 +358,7 @@ public void invalidateCache() {
         }
       }
       activeReader = null;
-      stateCache.invalidate(key);
+      stateCache.invalidate(key, getWork().getShardingKey());

Review comment:
       the cachedReader is also keyed: https://github.com/apache/beam/blob/88acc5267f759d81e9836a9db17b9e0ee521c785/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java#L338-L340
   Should it also be keyed by sharedKey?

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -2167,7 +2209,7 @@ public MapTask getMapTask() {
     }
 
     /** Mark the given key and work as active. */
-    public boolean activateWork(ByteString key, Work work) {
+    public boolean activateWork(ShardedKey key, Work work) {

Review comment:
       `shardedKey`?  

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -2193,7 +2235,7 @@ public boolean activateWork(ByteString key, Work work) {
     }
 
     /** Marks the work for a the given key as complete. Schedules queued work for the key if any. */
-    public void completeWork(ByteString key, long workToken) {
+    public void completeWork(ShardedKey key, long workToken) {

Review comment:
       `sharedKey`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-677870209


   > Not sure how much we want to invest in clean-up here since there are data structures which are effective copies and the methods should take an object that represents the computation and key instead of requiring the explicit parameters to be exploded out. We would likely get a good savings on memory usage since we would be passing around a single object in a lot of these places and we would also save some CPU since we aren't creating/destroying the objects all over the place.
   > MetricTrackingWindmillServerStub.KeyAndComputation == ReaderCache.CacheKey == WindmillStateCache.ComputationKey
   > There are some other types which are sub/super types of the one above.
   
   Sounds good to deduplicate. Would it make sense to have a separate PR for clean-up?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-675127178


   R: @lukecwik 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r474233274



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -59,7 +59,41 @@
     }
   }
 
-  private final Cache<KV<String, ByteString>, CacheEntry> cache;
+  private static class CacheKey {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik edited a comment on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik edited a comment on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-675127178


   R: @lukecwik I have a queue of reviews, I'll get to this in the next day or so.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12578:
URL: https://github.com/apache/beam/pull/12578


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-678413620


   > > Not sure how much we want to invest in clean-up here since there are data structures which are effective copies and the methods should take an object that represents the computation and key instead of requiring the explicit parameters to be exploded out. We would likely get a good savings on memory usage since we would be passing around a single object in a lot of these places and we would also save some CPU since we aren't creating/destroying the objects all over the place.
   > > MetricTrackingWindmillServerStub.KeyAndComputation == ReaderCache.CacheKey == WindmillStateCache.ComputationKey
   > > There are some other types which are sub/super types of the one above.
   > 
   > Sounds good to deduplicate. Would it make sense to have a separate PR for clean-up?
   
   Yes, a separate PR sounds best.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r473396947



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -59,7 +59,41 @@
     }
   }
 
-  private final Cache<KV<String, ByteString>, CacheEntry> cache;
+  private static class CacheKey {

Review comment:
       Use `@AutoValue`

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
         CacheBuilder.newBuilder()
             .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
             .removalListener(
-                (RemovalNotification<KV<String, ByteString>, CacheEntry> notification) -> {
+                (RemovalNotification<CacheKey, CacheEntry> notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
-                    LOG.info("Closing idle reader for {}", keyToString(notification.getKey()));
+                    LOG.info("Closing idle reader for {}", notification.getKey().toString());
                     closeReader(notification.getKey(), notification.getValue());
                   }
                 })
             .build();
   }
 
-  private static String keyToString(KV<String, ByteString> key) {
-    return key.getKey() + "-" + key.getValue().toStringUtf8();
-  }
-
   /** Close the reader and log a warning if close fails. */
-  private void closeReader(KV<String, ByteString> key, CacheEntry entry) {
+  private void closeReader(CacheKey key, CacheEntry entry) {
     try {
       entry.reader.close();
     } catch (IOException e) {
-      LOG.warn("Failed to close UnboundedReader for {}", keyToString(key), e);
+      LOG.warn("Failed to close UnboundedReader for {}", key.toString(), e);

Review comment:
       ```suggestion
         LOG.warn("Failed to close UnboundedReader for {}", key, e);
   ```

##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 1);

Review comment:
       ```suggestion
       ShardedKey key1Shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 1);
   ```

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -1095,14 +1098,46 @@ public void run() {
             }
           }
         };
-    if (!computationState.activateWork(workItem.getKey(), work)) {
+    if (!computationState.activateWork(
+        new ShardedKey(workItem.getKey(), workItem.getShardingKey()), work)) {
       // Free worker if the work was not activated.
       // This can happen if it's duplicate work or some other reason.
       sdkHarnessRegistry.completeWork(worker);
     }
   }
 
+  static class ShardedKey {

Review comment:
       Use `@AutoValue`

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
         CacheBuilder.newBuilder()
             .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
             .removalListener(
-                (RemovalNotification<KV<String, ByteString>, CacheEntry> notification) -> {
+                (RemovalNotification<CacheKey, CacheEntry> notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
-                    LOG.info("Closing idle reader for {}", keyToString(notification.getKey()));
+                    LOG.info("Closing idle reader for {}", notification.getKey().toString());

Review comment:
       no need for `toString` since `{}` does type conversions for you already
   ```suggestion
                       LOG.info("Closing idle reader for {}", notification.getKey());
   ```

##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 1);
+    ShardedKey key1_shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"), 2);

Review comment:
       ```suggestion
       ShardedKey key1Shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"), 2);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on a change in pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
nehsyc commented on a change in pull request #12578:
URL: https://github.com/apache/beam/pull/12578#discussion_r474234594



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java
##########
@@ -72,25 +106,21 @@
         CacheBuilder.newBuilder()
             .expireAfterWrite(cacheDuration.getMillis(), TimeUnit.MILLISECONDS)
             .removalListener(
-                (RemovalNotification<KV<String, ByteString>, CacheEntry> notification) -> {
+                (RemovalNotification<CacheKey, CacheEntry> notification) -> {
                   if (notification.getCause() != RemovalCause.EXPLICIT) {
-                    LOG.info("Closing idle reader for {}", keyToString(notification.getKey()));
+                    LOG.info("Closing idle reader for {}", notification.getKey().toString());

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 1);

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
##########
@@ -2131,8 +2215,50 @@ public void testActiveWork() throws Exception {
     Mockito.verifyNoMoreInteractions(mockExecutor);
   }
 
+  @Test
+  public void testActiveWorkForShardedKeys() throws Exception {
+    BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class);
+    StreamingDataflowWorker.ComputationState computationState =
+        new StreamingDataflowWorker.ComputationState(
+            "computation",
+            defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))),
+            mockExecutor,
+            ImmutableMap.of(),
+            null);
+
+    ShardedKey key1_shard1 = new ShardedKey(ByteString.copyFromUtf8("key1"), 1);
+    ShardedKey key1_shard2 = new ShardedKey(ByteString.copyFromUtf8("key1"), 2);

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
##########
@@ -1095,14 +1098,46 @@ public void run() {
             }
           }
         };
-    if (!computationState.activateWork(workItem.getKey(), work)) {
+    if (!computationState.activateWork(
+        new ShardedKey(workItem.getKey(), workItem.getShardingKey()), work)) {
       // Free worker if the work was not activated.
       // This can happen if it's duplicate work or some other reason.
       sdkHarnessRegistry.completeWork(worker);
     }
   }
 
+  static class ShardedKey {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nehsyc commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
nehsyc commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-678462038


   Run Spotless PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12578: [BEAM-10703] Prepare Dataflow Java runner for shardable states

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12578:
URL: https://github.com/apache/beam/pull/12578#issuecomment-675046358


   cc: @lukecwik, @kennknowles 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org