You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/22 16:05:34 UTC

[GitHub] [beam] y1chi opened a new pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

y1chi opened a new pull request #11916:
URL: https://github.com/apache/beam/pull/11916


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-638541628


   Last time someone started adding this feature, we decided to call it ReadModifyWrite state. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647623292


   > > @mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail
   > > https://github.com/apache/beam/blob/d5c3391119061581b13d8b647b5c1f423a39d4b2/sdks/python/apache_beam/runners/worker/sdk_worker.py#L893
   > > 
   > > is this a known issue and being worked on?
   > 
   > The Flink Runner only sends one cache token per worker and application attempt. Please see
   > 
   > https://github.com/apache/beam/blob/f3d5fc4b9fdbfdf943bb4d57d16a4dcb022923dc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L400
   
   it looks like the fn-execution core module is trying to add a cache token for each state key?
   https://github.com/apache/beam/blob/2fd785dedb979a248e63c6385f978fd18fd2fbc4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java#L272


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi edited a comment on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi edited a comment on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-646900931


   @mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail https://github.com/apache/beam/blob/d5c3391119061581b13d8b647b5c1f423a39d4b2/sdks/python/apache_beam/runners/worker/sdk_worker.py#L893 is this a known issue and being worked on?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-639157754


   Yes, the plan was to consider changing Java too, though that's harder due to backwards compatibility issues. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-646701032


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-646457367


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-650265410


   My pleasure!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-638950281


   > Last time someone started adding this feature, we decided to call it ReadModifyWrite state.
   
   Java SDK still calls this ValueState, ReadModifyWriteState is only used in beam_runner_api, wondering if we should change Java SDK as well if we want to call it ReadModifyWriteState in python?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647616682


   > > @mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail
   > > https://github.com/apache/beam/blob/d5c3391119061581b13d8b647b5c1f423a39d4b2/sdks/python/apache_beam/runners/worker/sdk_worker.py#L893
   > > 
   > > is this a known issue and being worked on?
   > 
   > The Flink Runner only sends one cache token per worker and application attempt. Please see
   > 
   > https://github.com/apache/beam/blob/f3d5fc4b9fdbfdf943bb4d57d16a4dcb022923dc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L400
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] angoenka merged pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
angoenka merged pull request #11916:
URL: https://github.com/apache/beam/pull/11916


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647981883


   Thanks for the stacktrace, that helped to figure out what's going on here. The issue is only present in batch mode where Flink does not use its own memory backend but uses Beam's `InMemoryBagUserStateFactory`.
   
   We have to adapt the implementation to return the same cache token for all `InMemorySingleKeyBagState`, see: https://github.com/apache/beam/blob/63d51f5c4f89a4c881243d9d43be2ac138b1254b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java#L60 
   https://github.com/apache/beam/blob/63d51f5c4f89a4c881243d9d43be2ac138b1254b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java#L116


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-650257300


   > Glad to see we were able to unblock the changes here!
   
   Thanks for the fix!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647696671


   > It is true that the fn protocol supports one cache token per handler (e.g. user state or side input handler). Those handler do not change for the lifetime of the application. I'm still trying to understand what the problem is. Cache tokens have been working fine so far. Could you provide some logs or test cases which show that there is a problem?
   
   Last time I checked with @lukecwik he mentioned the SDK is expecting one global cache token per-bundle for all user states, and one cache token per side-input.
   
   The problem is that it seems we can't declare more than one user state in Stateful Dofn, otherwise the SDK fails. Such as the test_pardo_state_only_test I'm trying to update in this PR.
   
   ```
   09:13:30 [flink-runner-job-invoker] ERROR org.apache.beam.runners.jobsubmission.JobInvocation - Error during job invocation test_pardo_state_only_1592842407.43_aafef3e7-73a3-4a63-be4c-4be6c6fc7b7d.
   09:13:30 java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   09:13:30 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   09:13:30 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   09:13:30 	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:864)
   09:13:30 	at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:194)
   09:13:30 	at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:116)
   09:13:30 	at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:83)
   09:13:30 	at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:83)
   09:13:30 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
   09:13:30 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
   09:13:30 	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
   09:13:30 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   09:13:30 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   09:13:30 	at java.lang.Thread.run(Thread.java:748)
   09:13:30 Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
   09:13:30 	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
   09:13:30 	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.lambda$getJobExecutionResult$2(PerJobMiniClusterFactory.java:175)
   09:13:30 	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
   09:13:30 	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
   09:13:30 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   09:13:30 	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
   09:13:30 	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
   09:13:30 	at akka.dispatch.OnComplete.internal(Future.scala:264)
   09:13:30 	at akka.dispatch.OnComplete.internal(Future.scala:261)
   09:13:30 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
   09:13:30 	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
   09:13:30 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
   09:13:30 	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
   09:13:30 	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
   09:13:30 	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
   09:13:30 	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
   09:13:30 	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
   09:13:30 	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
   09:13:30 	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
   09:13:30 	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
   09:13:30 	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
   09:13:30 	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
   09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
   09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
   09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
   09:13:30 	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
   09:13:30 	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
   09:13:30 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
   09:13:30 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
   09:13:30 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   09:13:30 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   09:13:30 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   09:13:30 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   09:13:30 Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
   09:13:30 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
   09:13:30 	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
   09:13:30 	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
   09:13:30 	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
   09:13:30 	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
   09:13:30 	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
   09:13:30 	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
   09:13:30 	at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
   09:13:30 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   09:13:30 	at java.lang.reflect.Method.invoke(Method.java:498)
   09:13:30 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
   09:13:30 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
   09:13:30 	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   09:13:30 	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   09:13:30 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   09:13:30 	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   09:13:30 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   09:13:30 	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   09:13:30 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   09:13:30 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   09:13:30 	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   09:13:30 	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   09:13:30 	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   09:13:30 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   09:13:30 	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   09:13:30 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   09:13:30 	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   09:13:30 	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   09:13:30 	... 4 more
   09:13:30 Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 247, in _execute
   09:13:30     response = task()
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 304, in <lambda>
   09:13:30     lambda: self.create_worker().do_instruction(request), request)
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 473, in do_instruction
   09:13:30     getattr(request, request_type), request.instruction_id)
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 505, in process_bundle
   09:13:30     instruction_id, request.cache_tokens):
   09:13:30   File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
   09:13:30     return self.gen.next()
   09:13:30   File "apache_beam/runners/worker/sdk_worker.py", line 894, in process_instruction_id
   09:13:30     assert not user_state_cache_token
   09:13:30 AssertionError
   ```
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-646361385


   @committer This is approved. Could someone commit this please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #11916:
URL: https://github.com/apache/beam/pull/11916#discussion_r438941603



##########
File path: sdks/python/apache_beam/runners/direct/direct_userstate.py
##########
@@ -25,6 +25,7 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.trigger import _ListStateTag
 from apache_beam.transforms.trigger import _SetStateTag
+from apache_beam.transforms.trigger import _ValueStateTag

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/userstate_test.py
##########
@@ -452,6 +459,40 @@ def clear_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
 
     self.assertEqual(['extra'], StatefulDoFnOnDirectRunnerTest.all_records)
 
+  def test_simple_read_modify_write_stateful_dofn(self):
+    class SimpleTestReadModifyWriteStatefulDoFn(DoFn):
+      VALUE_STATE = ReadModifyWriteStateSpec('value', StrUtf8Coder())
+
+      def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+        last_element.write('%s:%s' % element)
+        yield last_element.read()
+
+    with TestPipeline() as p:
+      (
+          p | beam.Create([('a', 1), ('b', 3), ('c', 5)])
+          | beam.ParDo(SimpleTestReadModifyWriteStatefulDoFn())
+          | beam.ParDo(self.record_dofn()))
+    self.assertEqual(['a:1', 'b:3', 'c:5'],
+                     StatefulDoFnOnDirectRunnerTest.all_records)
+
+  def test_clearing_read_modify_write_state(self):
+    class SimpleClearingReadModifyWriteStatefulDoFn(DoFn):
+      VALUE_STATE = ReadModifyWriteStateSpec('value', VarIntCoder())
+
+      def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+        last_element.write(element[1])

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] angoenka commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
angoenka commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-642900361


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647999424


   I've created a PR which fixes the problem: https://github.com/apache/beam/pull/12062


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-639789507


   > Yes, the plan was to consider changing Java too, though that's harder due to backwards compatibility issues.
   
   Renamed to ReadModifyWriteState.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-646900931


   @mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail https://github.com/apache/beam/blob/d5c3391119061581b13d8b647b5c1f423a39d4b2/sdks/python/apache_beam/runners/worker/sdk_worker.py#L893, is this a known issue and being worked on?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11916:
URL: https://github.com/apache/beam/pull/11916#discussion_r438296283



##########
File path: sdks/python/apache_beam/transforms/userstate_test.py
##########
@@ -452,6 +459,40 @@ def clear_values(self, bag_state=beam.DoFn.StateParam(BAG_STATE)):
 
     self.assertEqual(['extra'], StatefulDoFnOnDirectRunnerTest.all_records)
 
+  def test_simple_read_modify_write_stateful_dofn(self):
+    class SimpleTestReadModifyWriteStatefulDoFn(DoFn):
+      VALUE_STATE = ReadModifyWriteStateSpec('value', StrUtf8Coder())
+
+      def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+        last_element.write('%s:%s' % element)
+        yield last_element.read()
+
+    with TestPipeline() as p:
+      (
+          p | beam.Create([('a', 1), ('b', 3), ('c', 5)])
+          | beam.ParDo(SimpleTestReadModifyWriteStatefulDoFn())
+          | beam.ParDo(self.record_dofn()))
+    self.assertEqual(['a:1', 'b:3', 'c:5'],
+                     StatefulDoFnOnDirectRunnerTest.all_records)
+
+  def test_clearing_read_modify_write_state(self):
+    class SimpleClearingReadModifyWriteStatefulDoFn(DoFn):
+      VALUE_STATE = ReadModifyWriteStateSpec('value', VarIntCoder())
+
+      def process(self, element, last_element=DoFn.StateParam(VALUE_STATE)):
+        last_element.write(element[1])

Review comment:
       Maybe to make this a stronger test, also write something at the end of process, and try reading it at the very beginning. 

##########
File path: sdks/python/apache_beam/runners/direct/direct_userstate.py
##########
@@ -25,6 +25,7 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.trigger import _ListStateTag
 from apache_beam.transforms.trigger import _SetStateTag
+from apache_beam.transforms.trigger import _ValueStateTag

Review comment:
       Should this be renamed as well? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647685479


   It is true that the fn protocol supports one cache token per handler (e.g. user state or side input handler). Those handler do not change for the lifetime of the application. I'm still trying to understand what the problem is. Cache tokens have been working fine so far. Could you provide some logs or test cases which show that there is a problem?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-649529229


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-650034085


   Glad to see we were able to unblock the changes here!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi removed a comment on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi removed a comment on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-647616682


   > > @mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail
   > > https://github.com/apache/beam/blob/d5c3391119061581b13d8b647b5c1f423a39d4b2/sdks/python/apache_beam/runners/worker/sdk_worker.py#L893
   > > 
   > > is this a known issue and being worked on?
   > 
   > The Flink Runner only sends one cache token per worker and application attempt. Please see
   > 
   > https://github.com/apache/beam/blob/f3d5fc4b9fdbfdf943bb4d57d16a4dcb022923dc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L400
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-646974996


   > @mxm Seems like flink runner is sending cache token per state, instead of one user state token per bundle. This will cause python sdk to fail
   > 
   > https://github.com/apache/beam/blob/d5c3391119061581b13d8b647b5c1f423a39d4b2/sdks/python/apache_beam/runners/worker/sdk_worker.py#L893
   > is this a known issue and being worked on?
   
   The Flink Runner only sends one cache token per worker and application attempt. Please see https://github.com/apache/beam/blob/f3d5fc4b9fdbfdf943bb4d57d16a4dcb022923dc/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L400


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi closed pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi closed pull request #11916:
URL: https://github.com/apache/beam/pull/11916


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-649529077


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on pull request #11916: [BEAM-10189] Add ValueState user state to python sdk

Posted by GitBox <gi...@apache.org>.
y1chi commented on pull request #11916:
URL: https://github.com/apache/beam/pull/11916#issuecomment-638528628


   R: @angoenka @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org