You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/02/05 10:59:07 UTC

[GitHub] flink pull request #5339: [FLINK-8493] [flip6] Integrate queryable state wit...

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5339#discussion_r165939136
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java ---
    @@ -205,32 +201,32 @@ private void executeActionAsync(
     			return cachedFuture;
     		}
     
    -		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
    -
    -		final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
    -		lookupCache.put(cacheKey, location);
    -		return proxy.getJobManagerFuture().thenComposeAsync(
    -				jobManagerGateway -> {
    -					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
    -					jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
    -							.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
    -							.onComplete(new OnComplete<KvStateLocation>() {
    -
    -								@Override
    -								public void onComplete(Throwable failure, KvStateLocation loc) throws Throwable {
    -									if (failure != null) {
    -										if (failure instanceof FlinkJobNotFoundException) {
    -											// if the jobId was wrong, remove the entry from the cache.
    -											lookupCache.remove(cacheKey);
    -										}
    -										location.completeExceptionally(failure);
    -									} else {
    -										location.complete(loc);
    -									}
    -								}
    -							}, Executors.directExecutionContext());
    -					return location;
    -				}, queryExecutor);
    +		final KvStateLocationOracle kvStateLocationOracle = proxy.getKvStateLocationOracle(jobId);
    +
    +		if (kvStateLocationOracle != null) {
    +			LOG.debug("Retrieving location for state={} of job={} from the key-value state location oracle.", jobId, queryableStateName);
    +			final CompletableFuture<KvStateLocation> location = new CompletableFuture<>();
    +			lookupCache.put(cacheKey, location);
    +
    +			kvStateLocationOracle
    +				.requestKvStateLocation(jobId, queryableStateName)
    +				.whenComplete(
    +					(KvStateLocation kvStateLocation, Throwable throwable) -> {
    +						if (throwable != null) {
    +							if (ExceptionUtils.stripCompletionException(throwable) instanceof FlinkJobNotFoundException) {
    +								// if the jobId was wrong, remove the entry from the cache.
    +								lookupCache.remove(cacheKey);
    +							}
    +							location.completeExceptionally(throwable);
    +						} else {
    +							location.complete(kvStateLocation);
    +						}
    +					});
    +
    +			return location;
    +		} else {
    +			return FutureUtils.completedExceptionally(new UnknownJobManagerException());
    --- End diff --
    
    I would be up for changing the name of this exception, as now it is an `UnknownJobManagerException`. The role of the job manager now is played by the oracle. I understand that this may change user-facing behavior, and that `UnknowOracleException` is a bit cryptic, but I suppose we can find something more adequate. And even if we leave it like this for now, we should open a JIRA for the future.


---