You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:46 UTC

[10/17] flink git commit: [hotfix] [docs] Minor cleanup of Java example code for AsyncFunctions

[hotfix] [docs] Minor cleanup of Java example code for AsyncFunctions

This closes #5848


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18bba6bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18bba6bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18bba6bc

Branch: refs/heads/master
Commit: 18bba6bc9b6d552f69d1f1b17cd2d91248017872
Parents: 2de321e
Author: Ken Krugler <ke...@transpac.com>
Authored: Sat Apr 14 12:57:22 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:35 2018 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/asyncio.md | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18bba6bc/docs/dev/stream/operators/asyncio.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/asyncio.md b/docs/dev/stream/operators/asyncio.md
index 702d2ae..6457074 100644
--- a/docs/dev/stream/operators/asyncio.md
+++ b/docs/dev/stream/operators/asyncio.md
@@ -104,17 +104,26 @@ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, Stri
     }
 
     @Override
-    public void asyncInvoke(final String str, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
+    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
 
         // issue the asynchronous request, receive a future for result
-        Future<String> resultFuture = client.query(str);
+        final Future<String> result = client.query(key);
 
         // set the callback to be executed once the request by the client is complete
         // the callback simply forwards the result to the result future
-        resultFuture.thenAccept( (String result) -> {
-
-            resultFuture.complete(Collections.singleton(new Tuple2<>(str, result)));
-         
+        CompletableFuture.supplyAsync(new Supplier<String>() {
+
+            @Override
+            public String get() {
+                try {
+                    return result.get();
+                } catch (InterruptedException | ExecutionException e) {
+                    // Normally handled explicitly.
+                    return null;
+                }
+            }
+        }).thenAccept( (String dbResult) -> {
+            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
         });
     }
 }