You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/21 14:31:25 UTC

[GitHub] [flink] rmetzger edited a comment on pull request #13162: [FLINK-18685][API / DataStream]JobClient.getAccumulators() blocks until streaming job has finished in local environment

rmetzger edited a comment on pull request #13162:
URL: https://github.com/apache/flink/pull/13162#issuecomment-678321188


   This is the error I'm seeing 
   ```
   java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
   	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
   	at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:707)
   	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:621)
   	at org.apache.flink.runtime.minicluster.MiniCluster.getExecutionGraph(MiniCluster.java:607)
   	at org.apache.flink.client.program.PerJobMiniClusterFactory$PerJobMiniClusterJobClient.getAccumulators(PerJobMiniClusterFactory.java:182)
   	at org.apache.flink.client.program.PerJobMiniClusterFactoryTest.testJobExecution(PerJobMiniClusterFactoryTest.java:67)
   ```
   
   We could implement the getAccumulator methods as follows
   ```java
   		@Override
   		public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
   			if (miniCluster.isRunning()) {
   				return miniCluster
   					.getExecutionGraph(jobID)
   					.thenApply(AccessExecutionGraph::getAccumulatorsSerialized)
   					.thenApply(accumulators -> {
   						try {
   							return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, classLoader);
   						} catch (Exception e) {
   							throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
   						}
   					});
   			} else {
   				return getJobExecutionResult(classLoader).thenApply(JobExecutionResult::getAllAccumulatorResults);
   			}
   		}
   ```
   
   (Disclaimer: I'm not very familiar with that part of the codebase, I might need to ask another committer for a final review)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org