You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/02/05 10:59:07 UTC
[GitHub] flink pull request #5339: [FLINK-8493] [flip6] Integrate queryable state wit...
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5339#discussion_r165939136
--- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java ---
@@ -205,32 +201,32 @@ private void executeActionAsync(
return cachedFuture;
}
- LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
-
- final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
- lookupCache.put(cacheKey, location);
- return proxy.getJobManagerFuture().thenComposeAsync(
- jobManagerGateway -> {
- final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
- jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
- .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
- .onComplete(new OnComplete<KvStateLocation>() {
-
- @Override
- public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
- if (failure != null) {
- if (failure instanceof FlinkJobNotFoundException) {
- // if the jobId was wrong, remove the entry from the cache.
- lookupCache.remove(cacheKey);
- }
- location.completeExceptionally(failure);
- } else {
- location.complete(loc);
- }
- }
- }, Executors.directExecutionContext());
- return location;
- }, queryExecutor);
+ final KvStateLocationOracle kvStateLocationOracle = proxy.getKvStateLocationOracle(jobId);
+
+ if (kvStateLocationOracle != null) {
+ LOG.debug("Retrieving location for state={} of job={} from the key-value state location oracle.", jobId, queryableStateName);
+ final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
+ lookupCache.put(cacheKey, location);
+
+ kvStateLocationOracle
+ .requestKvStateLocation(jobId, queryableStateName)
+ .whenComplete(
+ (KvStateLocation kvStateLocation, Throwable throwable) -> {
+ if (throwable != null) {
+ if (ExceptionUtils.stripCompletionException(throwable) instanceof FlinkJobNotFoundException) {
+ // if the jobId was wrong, remove the entry from the cache.
+ lookupCache.remove(cacheKey);
+ }
+ location.completeExceptionally(throwable);
+ } else {
+ location.complete(kvStateLocation);
+ }
+ });
+
+ return location;
+ } else {
+ return FutureUtils.completedExceptionally(new UnknownJobManagerException());
--- End diff --
I would be up for changing the name of this exception, as now it is an `UnknownJobManagerException`. The role of the job manager now is played by the oracle. I understand that this may change user-facing behavior, and that `UnknowOracleException` is a bit cryptic, but I suppose we can find something more adequate. And even if we leave it like this for now, we should open a JIRA for the future.
---