You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/09/05 08:07:25 UTC

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/2472

    [FLINK-4361] Introduce Flink's own future abstraction

    Flink's future abstraction whose API is similar to Java 8's CompletableFuture.
    That's in order to ease a future transition to this class once we ditch Java 7.
    The current set of operations comprises:
    
    - isDone to check the completion of the future
    - get/getNow to obtain the future's value
    - cancel to cancel the future (best effort basis)
    - thenApplyAsync to transform the future's value into another value
    - thenAcceptAsync to register a callback for a successful completion of the future
    - exceptionallyAsync to register a callback for an exception completion of the future
    - thenComposeAsync to transform the future's value and flatten the returned future
    - handleAsync to register a callback which is called either with the regular result
    or the exceptional result
    
    Additionally, Flink offers a CompletableFuture which can be completed with a regular
    value or an exception:
    
    - complete/completeExceptionally

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink futures

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2472.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2472
    
----
commit 14023bb0df8e1a263d4082fb0e747a831e82f4cd
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-09-02T19:13:34Z

    [FLINK-4361] Introduce Flink's own future abstraction
    
    Flink's future abstraction whose API is similar to Java 8's CompletableFuture.
    That's in order to ease a future transition to this class once we ditch Java 7.
    The current set of operations comprises:
    
    - isDone to check the completion of the future
    - get/getNow to obtain the future's value
    - cancel to cancel the future (best effort basis)
    - thenApplyAsync to transform the future's value into another value
    - thenAcceptAsync to register a callback for a successful completion of the future
    - exceptionallyAsync to register a callback for an exception completion of the future
    - thenComposeAsync to transform the future's value and flatten the returned future
    - handleAsync to register a callback which is called either with the regular result
    or the exceptional result
    
    Additionally, Flink offers a CompletableFuture which can be completed with a regular
    value or an exception:
    
    - complete/completeExceptionally

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79656257
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Flink's basic future abstraction. A future represents an asynchronous operation whose result
    + * will be contained in this instance upon completion.
    + *
    + * @param <T> type of the future's result
    + */
    +public interface Future<T> {
    +
    +	/**
    +	 * Checks if the future has been completed. A future is completed, if the result has been
    +	 * delivered.
    +	 *
    +	 * @return true if the future is completed; otherwise false
    +	 */
    +	boolean isDone();
    --- End diff --
    
    I see. After all, it's a minor naming issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79565763
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent.impl;
    +
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Promise;
    +
    +/**
    + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}.
    + *
    + * @param <T> type of the future's value
    + */
    +public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> {
    +
    +	private final Promise<T> promise;
    +
    +	public FlinkCompletableFuture() {
    +		promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
    +		scalaFuture = promise.future();
    +
    --- End diff --
    
    Good point, will remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2472: [FLINK-4361] Introduce Flink's own future abstraction

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2472
  
    The naming is not super nice, I agree. But there is a clear benefit for Java 8 familiar people to stick with something established.
    
    +1 for this from my side.
    
    Next time we get involved in the Java community processes, to make sure they have better names ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79597647
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Flink's basic future abstraction. A future represents an asynchronous operation whose result
    + * will be contained in this instance upon completion.
    + *
    + * @param <T> type of the future's result
    + */
    +public interface Future<T> {
    +
    +	/**
    +	 * Checks if the future has been completed. A future is completed, if the result has been
    +	 * delivered.
    +	 *
    +	 * @return true if the future is completed; otherwise false
    +	 */
    +	boolean isDone();
    --- End diff --
    
    Why is this not named `isCompleted()`? That would be analogue to the `complete()` function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79565983
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent.impl;
    +
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Promise;
    +
    +/**
    + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}.
    + *
    + * @param <T> type of the future's value
    + */
    +public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> {
    --- End diff --
    
    True, you're right. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79622945
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Flink's basic future abstraction. A future represents an asynchronous operation whose result
    + * will be contained in this instance upon completion.
    + *
    + * @param <T> type of the future's result
    + */
    +public interface Future<T> {
    +
    +	/**
    +	 * Checks if the future has been completed. A future is completed, if the result has been
    +	 * delivered.
    +	 *
    +	 * @return true if the future is completed; otherwise false
    +	 */
    +	boolean isDone();
    --- End diff --
    
    That's because I sticked to Java 8's `CompletableFuture` implementation where it is named the same.
    
    I'm not super happy with the naming either. But I also see the benefit of sticking to Java 8's `CompletableFuture` interface. This will allow us to easily replace it once we switch to Java 8.
    
    I think we have to decide whether we want to stick to Java 8's `CompletableFuture` or not. In the latter case we can rename other methods as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79623763
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Flink's basic future abstraction. A future represents an asynchronous operation whose result
    + * will be contained in this instance upon completion.
    + *
    + * @param <T> type of the future's result
    + */
    +public interface Future<T> {
    +
    +	/**
    +	 * Checks if the future has been completed. A future is completed, if the result has been
    +	 * delivered.
    +	 *
    +	 * @return true if the future is completed; otherwise false
    +	 */
    +	boolean isDone();
    +
    +	/**
    +	 * Tries to cancel the future's operation. Note that not all future operations can be canceled.
    +	 * The result of the cancelling will be returned.
    +	 *
    +	 * @param mayInterruptIfRunning true iff the future operation may be interrupted
    +	 * @return true if the cancelling was successful; otherwise false
    +	 */
    +	boolean cancel(boolean mayInterruptIfRunning);
    +
    +	/**
    +	 * Gets the result value of the future. If the future has not been completed, then this
    +	 * operation will block indefinitely until the result has been delivered.
    +	 *
    +	 * @return the result value
    +	 * @throws CancellationException if the future has been cancelled
    +	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
    +	 * @throws ExecutionException if the future has been completed with an exception
    +	 */
    +	T get() throws InterruptedException, ExecutionException;
    +
    +	/**
    +	 * Gets the result value of the future. If the future has not been done, then this operation
    +	 * will block the given timeout value. If the result has not been delivered within the timeout,
    +	 * then the method throws an {@link TimeoutException}.
    +	 *
    +	 * @param timeout the time to wait for the future to be done
    +	 * @param unit time unit for the timeout argument
    +	 * @return the result value
    +	 * @throws CancellationException if the future has been cancelled
    +	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
    +	 * @throws ExecutionException if the future has been completed with an exception
    +	 * @throws TimeoutException if the future has not been completed within the given timeout
    +	 */
    +	T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    +
    +	/**
    +	 * Gets the value of the future. If the future has not been completed when calling this
    +	 * function, the given value is returned.
    +	 *
    +	 * @param valueIfAbsent value which is returned if the future has not been completed
    +	 * @return value of the future or the given value if the future has not been completed
    +	 * @throws ExecutionException if the future has been completed with an exception
    +	 */
    +	T getNow(T valueIfAbsent) throws ExecutionException;
    +
    +	/**
    +	 * Applies the given function to the value of the future. The result of the apply function is
    +	 * the value of the newly returned future.
    +	 * <p>
    +	 * The apply function is executed asynchronously by the given executor.
    +	 *
    +	 * @param applyFunction function to apply to the future's value
    +	 * @param executor used to execute the given apply function asynchronously
    +	 * @param <R> type of the apply function's return value
    +	 * @return future representing the return value of the given apply function
    +	 */
    +	<R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> applyFunction, Executor executor);
    +
    +	/**
    +	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
    +	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
    +	 * the completion of the accept callback.
    +	 * <p>
    +	 * The accept function is executed asynchronously by the given executor.
    +	 *
    +	 * @param acceptFunction function to apply to the future's value
    +	 * @param executor used to execute the given apply function asynchronously
    +	 * @return future representing the completion of the accept callback
    +	 */
    +	Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, Executor executor);
    +
    +	/**
    +	 * Applies the given function to the value of the future if the future has been completed
    +	 * exceptionally. The completing exception is given to the apply function which can return a new
    +	 * value which is the value of the returned future.
    +	 * <p>
    +	 * The apply function is executed asynchronously by the given executor.
    +	 *
    +	 * @param exceptionallyFunction to apply to the future's value if it is an exception
    +	 * @param executor used to execute the given apply function asynchronously
    +	 * @param <R> type of the apply function's return value
    +	 * @return future representing the return value of the given apply function
    +	 */
    +	<R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor);
    +
    +	/**
    +	 * Applies the given function to the value of the future. The apply function can return a future
    --- End diff --
    
    You're right. The "can" should be removed. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2472: [FLINK-4361] Introduce Flink's own future abstraction

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2472
  
    Thanks for your review @KurtYoung. Will address your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r77971053
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent.impl;
    +
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Promise;
    +
    +/**
    + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}.
    + *
    + * @param <T> type of the future's value
    + */
    +public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> {
    --- End diff --
    
    How about letting this class overrides the cancel() methods and treat it as completeExceptionally with CancellationException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r79599026
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Future.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import java.util.concurrent.CancellationException;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.Executor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +/**
    + * Flink's basic future abstraction. A future represents an asynchronous operation whose result
    + * will be contained in this instance upon completion.
    + *
    + * @param <T> type of the future's result
    + */
    +public interface Future<T> {
    +
    +	/**
    +	 * Checks if the future has been completed. A future is completed, if the result has been
    +	 * delivered.
    +	 *
    +	 * @return true if the future is completed; otherwise false
    +	 */
    +	boolean isDone();
    +
    +	/**
    +	 * Tries to cancel the future's operation. Note that not all future operations can be canceled.
    +	 * The result of the cancelling will be returned.
    +	 *
    +	 * @param mayInterruptIfRunning true iff the future operation may be interrupted
    +	 * @return true if the cancelling was successful; otherwise false
    +	 */
    +	boolean cancel(boolean mayInterruptIfRunning);
    +
    +	/**
    +	 * Gets the result value of the future. If the future has not been completed, then this
    +	 * operation will block indefinitely until the result has been delivered.
    +	 *
    +	 * @return the result value
    +	 * @throws CancellationException if the future has been cancelled
    +	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
    +	 * @throws ExecutionException if the future has been completed with an exception
    +	 */
    +	T get() throws InterruptedException, ExecutionException;
    +
    +	/**
    +	 * Gets the result value of the future. If the future has not been done, then this operation
    +	 * will block the given timeout value. If the result has not been delivered within the timeout,
    +	 * then the method throws an {@link TimeoutException}.
    +	 *
    +	 * @param timeout the time to wait for the future to be done
    +	 * @param unit time unit for the timeout argument
    +	 * @return the result value
    +	 * @throws CancellationException if the future has been cancelled
    +	 * @throws InterruptedException if the current thread was interrupted while waiting for the result
    +	 * @throws ExecutionException if the future has been completed with an exception
    +	 * @throws TimeoutException if the future has not been completed within the given timeout
    +	 */
    +	T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    +
    +	/**
    +	 * Gets the value of the future. If the future has not been completed when calling this
    +	 * function, the given value is returned.
    +	 *
    +	 * @param valueIfAbsent value which is returned if the future has not been completed
    +	 * @return value of the future or the given value if the future has not been completed
    +	 * @throws ExecutionException if the future has been completed with an exception
    +	 */
    +	T getNow(T valueIfAbsent) throws ExecutionException;
    +
    +	/**
    +	 * Applies the given function to the value of the future. The result of the apply function is
    +	 * the value of the newly returned future.
    +	 * <p>
    +	 * The apply function is executed asynchronously by the given executor.
    +	 *
    +	 * @param applyFunction function to apply to the future's value
    +	 * @param executor used to execute the given apply function asynchronously
    +	 * @param <R> type of the apply function's return value
    +	 * @return future representing the return value of the given apply function
    +	 */
    +	<R> Future<R> thenApplyAsync(ApplyFunction<? super T, ? extends R> applyFunction, Executor executor);
    +
    +	/**
    +	 * Applies the accept function to the value of the future. Unlike the {@link ApplyFunction}, the
    +	 * {@link AcceptFunction} does not return a value. The returned future, thus, represents only
    +	 * the completion of the accept callback.
    +	 * <p>
    +	 * The accept function is executed asynchronously by the given executor.
    +	 *
    +	 * @param acceptFunction function to apply to the future's value
    +	 * @param executor used to execute the given apply function asynchronously
    +	 * @return future representing the completion of the accept callback
    +	 */
    +	Future<Void> thenAcceptAsync(AcceptFunction<? super T> acceptFunction, Executor executor);
    +
    +	/**
    +	 * Applies the given function to the value of the future if the future has been completed
    +	 * exceptionally. The completing exception is given to the apply function which can return a new
    +	 * value which is the value of the returned future.
    +	 * <p>
    +	 * The apply function is executed asynchronously by the given executor.
    +	 *
    +	 * @param exceptionallyFunction to apply to the future's value if it is an exception
    +	 * @param executor used to execute the given apply function asynchronously
    +	 * @param <R> type of the apply function's return value
    +	 * @return future representing the return value of the given apply function
    +	 */
    +	<R> Future<R> exceptionallyAsync(ApplyFunction<Throwable, ? extends R> exceptionallyFunction, Executor executor);
    +
    +	/**
    +	 * Applies the given function to the value of the future. The apply function can return a future
    --- End diff --
    
    > can return a future
    
    should be
    
    > must return a future
    
    or
    
    > returns a future
    
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2472#discussion_r77970498
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkCompletableFuture.java ---
    @@ -0,0 +1,65 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent.impl;
    +
    +import org.apache.flink.runtime.concurrent.CompletableFuture;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Promise;
    +
    +/**
    + * Implementation of {@link CompletableFuture} which is backed by {@link Promise}.
    + *
    + * @param <T> type of the future's value
    + */
    +public class FlinkCompletableFuture<T> extends FlinkFuture<T> implements CompletableFuture<T> {
    +
    +	private final Promise<T> promise;
    +
    +	public FlinkCompletableFuture() {
    +		promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
    +		scalaFuture = promise.future();
    +
    --- End diff --
    
    unnecessary new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2472: [FLINK-4361] Introduce Flink's own future abstraction

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2472
  
    Thanks for the review @KurtYoung, @mxm and @StephanEwen. I will merge this PR to the flip-6 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2472: [FLINK-4361] Introduce Flink's own future abstract...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann closed the pull request at:

    https://github.com/apache/flink/pull/2472


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---