You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 13:02:04 UTC
[flink] 01/09: [hotfix] Add
LeaderRetrievalUtils#retrieveLeaderConnectionInfo with Time timeout
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bdbd11da9fa8357d924f88d710f0dbfa2c829d3b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Sep 10 14:42:32 2018 +0200
[hotfix] Add LeaderRetrievalUtils#retrieveLeaderConnectionInfo with Time timeout
---
.../flink/runtime/util/LeaderRetrievalUtils.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 4d78335..9c8f7bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -102,6 +103,23 @@ public class LeaderRetrievalUtils {
*/
public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
LeaderRetrievalService leaderRetrievalService,
+ Time timeout) throws LeaderRetrievalException {
+ return retrieveLeaderConnectionInfo(leaderRetrievalService, FutureUtils.toFiniteDuration(timeout));
+ }
+
+ /**
+ * Retrieves the leader akka url and the current leader session ID. The values are stored in a
+ * {@link LeaderConnectionInfo} instance.
+ *
+ * @param leaderRetrievalService Leader retrieval service to retrieve the leader connection
+ * information
+ * @param timeout Timeout when to give up looking for the leader
+ * @return LeaderConnectionInfo containing the leader's akka URL and the current leader session
+ * ID
+ * @throws LeaderRetrievalException
+ */
+ public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
+ LeaderRetrievalService leaderRetrievalService,
FiniteDuration timeout
) throws LeaderRetrievalException {
LeaderConnectionInfoListener listener = new LeaderConnectionInfoListener();