You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:15 UTC
[10/16] flink git commit: [FLINK-6078] Remove CuratorFramework#close
calls from ZooKeeper based HA services
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-python/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/resources/log4j-test.properties b/flink-libraries/flink-python/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..18b51cc
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-libraries/flink-python/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/resources/logback-test.xml b/flink-libraries/flink-python/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..1d64d46
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/resources/logback-test.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] [%X{sourceThread} - %X{akkaSource}] %-5level %logger{60} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+ <!-- The following loggers are disabled during tests, because many tests deliberately
+ throw error to test failing scenarios. Logging those would overflow the log. -->
+ <!---->
+ <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
+ <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
+ <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+ <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
+ <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
+ <logger name="org.apache.flink.runtime.testingUtils" level="OFF"/>
+ <logger name="org.apache.flink.runtime.executiongraph.ExecutionGraph" level="OFF"/>
+ <logger name="org.apache.flink.runtime.jobmanager.EventCollector" level="OFF"/>
+ <logger name="org.apache.flink.runtime.instance.InstanceManager" level="OFF"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 0c12745..5513df4 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -48,17 +48,17 @@ import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.mesos.Protos;
@@ -67,7 +67,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
-import scala.Some;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@@ -206,6 +205,7 @@ public class MesosApplicationMasterRunner {
ScheduledExecutorService futureExecutor = null;
ExecutorService ioExecutor = null;
MesosServices mesosServices = null;
+ HighAvailabilityServices highAvailabilityServices = null;
try {
// ------- (1) load and parse / validate all configurations -------
@@ -295,6 +295,12 @@ public class MesosApplicationMasterRunner {
// 3) Resource Master for Mesos
// 4) Process reapers for the JobManager and Resource Master
+ // 0: Start the JobManager services
+ highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ config,
+ ioExecutor,
+ HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+
// 1: the JobManager
LOG.debug("Starting JobManager actor");
@@ -304,8 +310,9 @@ public class MesosApplicationMasterRunner {
actorSystem,
futureExecutor,
ioExecutor,
- new Some<>(JobMaster.JOB_MANAGER_NAME),
- Option.<String>empty(),
+ highAvailabilityServices,
+ Option.apply(JobMaster.JOB_MANAGER_NAME),
+ Option.apply(JobMaster.ARCHIVE_NAME),
getJobManagerClass(),
getArchivistClass())._1();
@@ -313,7 +320,12 @@ public class MesosApplicationMasterRunner {
// 2: the web monitor
LOG.debug("Starting Web Frontend");
- webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG);
+ webMonitor = BootstrapTools.startWebMonitorIfConfigured(
+ config,
+ highAvailabilityServices,
+ actorSystem,
+ jobManager,
+ LOG);
if(webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
@@ -327,17 +339,12 @@ public class MesosApplicationMasterRunner {
config,
ioExecutor);
- // we need the leader retrieval service here to be informed of new
- // leader session IDs, even though there can be only one leader ever
- LeaderRetrievalService leaderRetriever =
- LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
-
Props resourceMasterProps = MesosFlinkResourceManager.createActorProps(
getResourceManagerClass(),
config,
mesosConfig,
workerStore,
- leaderRetriever,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
taskManagerParameters,
taskManagerContainerSpec,
artifactServer,
@@ -435,6 +442,14 @@ public class MesosApplicationMasterRunner {
LOG.error("Failed to stop the artifact server", t);
}
+ if (highAvailabilityServices != null) {
+ try {
+ highAvailabilityServices.close();
+ } catch (Throwable t) {
+ LOG.error("Could not properly stop the high availability services.");
+ }
+ }
+
org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
AkkaUtils.getTimeout(config).toMillis(),
TimeUnit.MILLISECONDS,
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7915e8a..7fe5db5 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -43,11 +43,12 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.messages.*;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.Protos;
@@ -142,11 +143,10 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
/**
* The context fixture.
*/
- static class Context extends JavaTestKit {
+ static class Context extends JavaTestKit implements AutoCloseable {
// mocks
public ActorGateway jobManager;
- public LeaderRetrievalService retrievalService;
public MesosConfiguration mesosConfig;
public MesosWorkerStore workerStore;
public MesosArtifactResolver artifactResolver;
@@ -163,6 +163,8 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
public Protos.TaskID task2 = Protos.TaskID.newBuilder().setValue("taskmanager-00002").build();
public Protos.TaskID task3 = Protos.TaskID.newBuilder().setValue("taskmanager-00003").build();
+ private final TestingHighAvailabilityServices highAvailabilityServices;
+
/**
* Create mock RM dependencies.
*/
@@ -170,8 +172,19 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
super(system);
try {
- jobManager = TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
- retrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager.actor());
+ jobManager = TestingUtils.createForwardingActor(
+ system,
+ getTestActor(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID,
+ Option.<String>empty());
+
+ highAvailabilityServices = new TestingHighAvailabilityServices();
+
+ highAvailabilityServices.setJobMasterLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID,
+ new TestingLeaderRetrievalService(
+ jobManager.path(),
+ HighAvailabilityServices.DEFAULT_LEADER_ID));
// scheduler driver
schedulerDriver = mock(SchedulerDriver.class);
@@ -206,7 +219,14 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
TestingMesosFlinkResourceManager.class,
- config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
+ config,
+ mesosConfig,
+ workerStore,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ tmParams,
+ containerSpecification,
+ artifactResolver,
+ LOG));
resourceManagerInstance = resourceManagerRef.underlyingActor();
resourceManager = new AkkaActorGateway(resourceManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
@@ -235,6 +255,11 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
.setLaunch(Protos.Offer.Operation.Launch.newBuilder().addAllTaskInfos(Arrays.asList(taskInfo))
).build();
}
+
+ @Override
+ public void close() throws Exception {
+ highAvailabilityServices.closeAndCleanupAllData();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index e80c509..f31c932 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -103,6 +104,12 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
jobGraph.addVertex(task);
+ final Configuration config = new Configuration();
+
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ config,
+ TestingUtils.defaultExecutor());
+
ActorGateway jobManger = null;
ActorGateway taskManager = null;
@@ -125,13 +132,17 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
testActorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Configuration());
+ config,
+ highAvailabilityServices);
- final Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
- testActorSystem, jobManger, config, true, true);
+ testActorSystem,
+ highAvailabilityServices,
+ config,
+ true,
+ true);
final ActorGateway jm = jobManger;
@@ -264,6 +275,8 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
TestingUtils.stopActor(jobManger);
TestingUtils.stopActor(taskManager);
+ highAvailabilityServices.closeAndCleanupAllData();
+
for (Buffer buf : buffers) {
buf.recycle();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 5463384..b67e735 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -89,6 +90,12 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
jobGraph.addVertex(task);
+ final Configuration config = new Configuration();
+
+ final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ config,
+ TestingUtils.defaultExecutor());
+
ActorGateway jobManger = null;
ActorGateway taskManager = null;
@@ -97,13 +104,17 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
testActorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- new Configuration());
+ config,
+ highAvailabilityServices);
- final Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
taskManager = TestingUtils.createTaskManager(
- testActorSystem, jobManger, config, true, true);
+ testActorSystem,
+ highAvailabilityServices,
+ config,
+ true,
+ true);
final ActorGateway jm = jobManger;
@@ -182,6 +193,8 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger {
} finally {
TestingUtils.stopActor(jobManger);
TestingUtils.stopActor(taskManager);
+
+ highAvailabilityServices.closeAndCleanupAllData();
}
}};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 8af1f46..5ccfe90 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderelection.TestingListener;
@@ -33,7 +35,6 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
-import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.util.TestLogger;
@@ -49,8 +50,6 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -127,7 +126,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
ActorSystem[] jobManagerSystem = new ActorSystem[2];
WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2];
- List<LeaderRetrievalService> leaderRetrievalServices = new ArrayList<>();
+ HighAvailabilityServices highAvailabilityServices = null;
try (TestingServer zooKeeper = new TestingServer()) {
final Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
@@ -141,15 +140,20 @@ public class WebRuntimeMonitorITCase extends TestLogger {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
+ highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+ config,
+ TestingUtils.defaultExecutor());
+
for (int i = 0; i < jobManagerSystem.length; i++) {
jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
new Some<>(new Tuple2<String, Object>("localhost", 0)));
}
for (int i = 0; i < webMonitor.length; i++) {
- LeaderRetrievalService lrs = ZooKeeperUtils.createLeaderRetrievalService(config);
- leaderRetrievalServices.add(lrs);
- webMonitor[i] = new WebRuntimeMonitor(config, lrs, jobManagerSystem[i]);
+ webMonitor[i] = new WebRuntimeMonitor(
+ config,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ jobManagerSystem[i]);
}
ActorRef[] jobManager = new ActorRef[2];
@@ -164,6 +168,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
jobManagerSystem[i],
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
+ highAvailabilityServices,
JobManager.class,
MemoryArchivist.class)._1();
@@ -171,8 +176,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
webMonitor[i].start(jobManagerAddress[i]);
}
- LeaderRetrievalService lrs = ZooKeeperUtils.createLeaderRetrievalService(config);
- leaderRetrievalServices.add(lrs);
+ LeaderRetrievalService lrs = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
TestingListener leaderListener = new TestingListener();
lrs.start(leaderListener);
@@ -247,6 +251,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
assertTrue(response.getContent().contains("\"taskmanagers\":1") ||
response.getContent().contains("\"taskmanagers\":0"));
+ } finally {
+ lrs.stop();
}
}
finally {
@@ -260,8 +266,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
monitor.stop();
}
- for (LeaderRetrievalService lrs : leaderRetrievalServices) {
- lrs.stop();
+ if (highAvailabilityServices != null) {
+ highAvailabilityServices.closeAndCleanupAllData();
}
}
}
@@ -462,7 +468,8 @@ public class WebRuntimeMonitorITCase extends TestLogger {
WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
config,
- flink.createLeaderRetrievalService(),
+ flink.highAvailabilityServices().getJobManagerLeaderRetriever(
+ HighAvailabilityServices.DEFAULT_JOB_ID),
jmActorSystem);
webMonitor.start(jobManagerAddress);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 481559b..d60b26f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -55,7 +55,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
@Override
public void stop() {
- client.close();
+ // Nothing to do
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 76d6d86..b570383 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
@@ -96,14 +96,14 @@ public class JobClient {
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
- LeaderRetrievalService leaderRetrievalService,
+ HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
ClassLoader classLoader) {
checkNotNull(actorSystem, "The actorSystem must not be null.");
- checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
+ checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(jobGraph, "The jobGraph must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
@@ -112,7 +112,7 @@ public class JobClient {
// update messages, watches for disconnect between client and JobManager, ...
Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
- leaderRetrievalService,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates,
config);
@@ -125,11 +125,12 @@ public class JobClient {
new Timeout(AkkaUtils.INF_TIMEOUT()));
return new JobListeningContext(
- jobGraph.getJobID(),
- submissionFuture,
- jobClientActor,
- timeout,
- classLoader);
+ jobGraph.getJobID(),
+ submissionFuture,
+ jobClientActor,
+ timeout,
+ classLoader,
+ highAvailabilityServices);
}
@@ -142,7 +143,7 @@ public class JobClient {
ActorGateway jobManagerGateWay,
Configuration configuration,
ActorSystem actorSystem,
- LeaderRetrievalService leaderRetrievalService,
+ HighAvailabilityServices highAvailabilityServices,
FiniteDuration timeout,
boolean sysoutLogUpdates) {
@@ -150,14 +151,14 @@ public class JobClient {
checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null.");
checkNotNull(configuration, "The configuration must not be null.");
checkNotNull(actorSystem, "The actorSystem must not be null.");
- checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null.");
+ checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
checkNotNull(timeout, "The timeout must not be null.");
// we create a proxy JobClientActor that deals with all communication with
// the JobManager. It forwards the job attachments, checks the success/failure responses, logs
// update messages, watches for disconnect between client and JobManager, ...
Props jobClientActorProps = JobAttachmentClientActor.createActorProps(
- leaderRetrievalService,
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout,
sysoutLogUpdates);
@@ -169,12 +170,13 @@ public class JobClient {
new Timeout(AkkaUtils.INF_TIMEOUT()));
return new JobListeningContext(
- jobID,
- attachmentFuture,
- jobClientActor,
- timeout,
- actorSystem,
- configuration);
+ jobID,
+ attachmentFuture,
+ jobClientActor,
+ timeout,
+ actorSystem,
+ configuration,
+ highAvailabilityServices);
}
/**
@@ -357,20 +359,19 @@ public class JobClient {
*
* @param actorSystem The actor system that performs the communication.
* @param config The cluster wide configuration.
- * @param leaderRetrievalService Leader retrieval service which used to find the current leading
- * JobManager
+ * @param highAvailabilityServices Service factory for high availability services
* @param jobGraph JobGraph describing the Flink job
* @param timeout Timeout for futures
* @param sysoutLogUpdates prints log updates to system out if true
* @param classLoader The class loader for deserializing the results
* @return The job execution result
- * @throws org.apache.flink.runtime.client.JobExecutionException Thrown if the job
+ * @throws JobExecutionException Thrown if the job
* execution fails.
*/
public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
Configuration config,
- LeaderRetrievalService leaderRetrievalService,
+ HighAvailabilityServices highAvailabilityServices,
JobGraph jobGraph,
FiniteDuration timeout,
boolean sysoutLogUpdates,
@@ -379,7 +380,7 @@ public class JobClient {
JobListeningContext jobListeningContext = submitJob(
actorSystem,
config,
- leaderRetrievalService,
+ highAvailabilityServices,
jobGraph,
timeout,
sysoutLogUpdates,
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index a0bf97d..793041f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -167,7 +167,8 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
JobManagerActorRef msg = (JobManagerActorRef) message;
connectToJobManager(msg.jobManager());
- logAndPrintMessage("Connected to JobManager at " + msg.jobManager());
+ logAndPrintMessage("Connected to JobManager at " + msg.jobManager() +
+ " with leader session id " + leaderSessionID + '.');
connectedToJobManager();
}
@@ -326,8 +327,6 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader
getContext().unwatch(jobManager);
}
- LOG.info("Connected to new JobManager {}.", jobManager.path());
-
this.jobManager = jobManager;
getContext().watch(jobManager);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index b944ba8..fe8c34c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -22,6 +22,7 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.slf4j.Logger;
@@ -47,6 +48,9 @@ public final class JobListeningContext {
/** Timeout used Asks */
private final FiniteDuration timeout;
+ /** Service factory for high availability services */
+ private final HighAvailabilityServices highAvailabilityServices;
+
/** ActorSystem for leader retrieval */
private ActorSystem actorSystem;
/** Flink configuration for initializing the BlobService */
@@ -59,16 +63,18 @@ public final class JobListeningContext {
* Constructor to use when the class loader is available.
*/
public JobListeningContext(
- JobID jobID,
- Future<Object> jobResultFuture,
- ActorRef jobClientActor,
- FiniteDuration timeout,
- ClassLoader classLoader) {
+ JobID jobID,
+ Future<Object> jobResultFuture,
+ ActorRef jobClientActor,
+ FiniteDuration timeout,
+ ClassLoader classLoader,
+ HighAvailabilityServices highAvailabilityServices) {
this.jobID = checkNotNull(jobID);
this.jobResultFuture = checkNotNull(jobResultFuture);
this.jobClientActor = checkNotNull(jobClientActor);
this.timeout = checkNotNull(timeout);
this.classLoader = checkNotNull(classLoader);
+ this.highAvailabilityServices = checkNotNull(highAvailabilityServices, "highAvailabilityServices");
}
/**
@@ -80,13 +86,15 @@ public final class JobListeningContext {
ActorRef jobClientActor,
FiniteDuration timeout,
ActorSystem actorSystem,
- Configuration configuration) {
+ Configuration configuration,
+ HighAvailabilityServices highAvailabilityServices) {
this.jobID = checkNotNull(jobID);
this.jobResultFuture = checkNotNull(jobResultFuture);
this.jobClientActor = checkNotNull(jobClientActor);
this.timeout = checkNotNull(timeout);
this.actorSystem = checkNotNull(actorSystem);
this.configuration = checkNotNull(configuration);
+ this.highAvailabilityServices = checkNotNull(highAvailabilityServices, "highAvailabilityServices");
}
/**
@@ -135,7 +143,7 @@ public final class JobListeningContext {
private ActorGateway getJobManager() throws JobRetrievalException {
try {
return LeaderRetrievalUtils.retrieveLeaderGateway(
- LeaderRetrievalUtils.createLeaderRetrievalService(configuration, true),
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
actorSystem,
AkkaUtils.getLookupTimeout(configuration));
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index e356d2b..9bcaa18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -29,8 +29,8 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.NetUtils;
@@ -165,17 +165,20 @@ public class BootstrapTools {
/**
* Starts the web frontend.
+ *
* @param config The Flink config.
+ * @param highAvailabilityServices Service factory for high availability services
* @param actorSystem The ActorSystem to start the web frontend in.
* @param logger Logger for log output
* @return WebMonitor instance.
* @throws Exception
*/
public static WebMonitor startWebMonitorIfConfigured(
- Configuration config,
- ActorSystem actorSystem,
- ActorRef jobManager,
- Logger logger) throws Exception {
+ Configuration config,
+ HighAvailabilityServices highAvailabilityServices,
+ ActorSystem actorSystem,
+ ActorRef jobManager,
+ Logger logger) throws Exception {
// this ensures correct values are present in the web frontend
@@ -186,8 +189,8 @@ public class BootstrapTools {
if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
logger.info("Starting JobManager Web Frontend");
- LeaderRetrievalService leaderRetrievalService =
- LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+ LeaderRetrievalService leaderRetrievalService =
+ highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
// start the web frontend. we need to load this dynamically
// because it is not in the same project/dependencies
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 3de75d5..69a65dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -56,6 +56,13 @@ public interface HighAvailabilityServices extends AutoCloseable {
*/
UUID DEFAULT_LEADER_ID = new UUID(0, 0);
+ /**
+ * This JobID should be used to identify the old JobManager when using the
+ * {@link HighAvailabilityServices}. With Flip-6 every JobManager will have a distinct
+ * JobID assigned.
+ */
+ JobID DEFAULT_JOB_ID = new JobID(0L, 0L);
+
// ------------------------------------------------------------------------
// Services
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index 106be5a..c9e2957 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -115,7 +115,7 @@ public class HighAvailabilityServicesUtils {
if (port <= 0 || port >= 65536) {
throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
"' (port of the JobManager actor system) : " + port +
- ". it must be great than 0 and less than 65536.");
+ ". it must be greater than 0 and less than 65536.");
}
return Tuple2.of(hostname, port);
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 2a8af37..2f5cd25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -484,6 +484,15 @@ public class JobGraph implements Serializable {
}
/**
+ * Gets the list of assigned user jar paths.
+ *
+ * @return The list of assigned user jar paths
+ */
+ public List<Path> getUserJars() {
+ return userJars;
+ }
+
+ /**
* Adds the BLOB referenced by the key to the JobGraph's dependencies.
*
* @param key
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index aaafa76..2552088 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -143,11 +143,13 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
if (isRunning) {
jobGraphListener = null;
- pathCache.close();
-
- client.close();
-
- isRunning = false;
+ try {
+ pathCache.close();
+ } catch (Exception e) {
+ throw new Exception("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", e);
+ } finally {
+ isRunning = false;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 0fa6a9e..21e2ca1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.leaderelection;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
@@ -48,6 +49,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
+ private final Object lock = new Object();
+
/** Client to the ZooKeeper quorum */
private final CuratorFramework client;
@@ -62,12 +65,12 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
private UUID issuedLeaderSessionID;
- private UUID confirmedLeaderSessionID;
+ private volatile UUID confirmedLeaderSessionID;
/** The leader contender which applies for leadership */
private volatile LeaderContender leaderContender;
- private final Object lock = new Object();
+ private volatile boolean running;
private final ConnectionStateListener listener = new ConnectionStateListener() {
@Override
@@ -84,11 +87,17 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
* @param leaderPath ZooKeeper node path for the node which stores the current leader information
*/
public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) {
- this.client = client;
- this.leaderPath = leaderPath;
+ this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
+ this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath");
leaderLatch = new LeaderLatch(client, latchPath);
cache = new NodeCache(client, leaderPath);
+
+ issuedLeaderSessionID = null;
+ confirmedLeaderSessionID = null;
+ leaderContender = null;
+
+ running = false;
}
/**
@@ -105,32 +114,56 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
Preconditions.checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
- LOG.info("Starting ZooKeeperLeaderElectionService.");
+ LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
+
+ synchronized (lock) {
- leaderContender = contender;
+ leaderContender = contender;
- leaderLatch.addListener(this);
- leaderLatch.start();
+ leaderLatch.addListener(this);
+ leaderLatch.start();
- cache.getListenable().addListener(this);
- cache.start();
+ cache.getListenable().addListener(this);
+ cache.start();
- client.getConnectionStateListenable().addListener(listener);
+ client.getConnectionStateListenable().addListener(listener);
+
+ running = true;
+ }
}
@Override
public void stop() throws Exception{
- LOG.info("Stopping ZooKeeperLeaderElectionService.");
+ synchronized (lock) {
+ if (!running) {
+ return;
+ }
+
+ running = false;
+ confirmedLeaderSessionID = null;
+ issuedLeaderSessionID = null;
+ }
+
+ LOG.info("Stopping ZooKeeperLeaderElectionService {}.", this);
client.getConnectionStateListenable().removeListener(listener);
- cache.close();
- leaderLatch.close();
- client.close();
+ Exception exception = null;
- synchronized (lock) {
- confirmedLeaderSessionID = null;
- issuedLeaderSessionID = null;
+ try {
+ cache.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ try {
+ leaderLatch.close();
+ } catch(Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw new Exception("Could not properly stop the ZooKeeperLeaderElectionService.", exception);
}
}
@@ -148,9 +181,14 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
if(leaderLatch.hasLeadership()) {
// check if this is an old confirmation call
synchronized (lock) {
- if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
- confirmedLeaderSessionID = leaderSessionID;
- writeLeaderInformation(confirmedLeaderSessionID);
+ if (running) {
+ if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
+ confirmedLeaderSessionID = leaderSessionID;
+ writeLeaderInformation(confirmedLeaderSessionID);
+ }
+ } else {
+ LOG.debug("Ignoring the leader session Id {} confirmation, since the " +
+ "ZooKeeperLeaderElectionService has already been stopped.", leaderSessionID);
}
}
} else {
@@ -167,31 +205,41 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
@Override
public void isLeader() {
synchronized (lock) {
- issuedLeaderSessionID = UUID.randomUUID();
- confirmedLeaderSessionID = null;
+ if (running) {
+ issuedLeaderSessionID = UUID.randomUUID();
+ confirmedLeaderSessionID = null;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Grant leadership to contender {} with session ID {}.",
+ leaderContender.getAddress(),
+ issuedLeaderSessionID);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Grant leadership to contender {} with session ID {}.",
- leaderContender.getAddress(),
- issuedLeaderSessionID);
+ leaderContender.grantLeadership(issuedLeaderSessionID);
+ } else {
+ LOG.debug("Ignoring the grant leadership notification since the service has " +
+ "already been stopped.");
}
-
- leaderContender.grantLeadership(issuedLeaderSessionID);
}
}
@Override
public void notLeader() {
synchronized (lock) {
- issuedLeaderSessionID = null;
- confirmedLeaderSessionID = null;
+ if (running) {
+ issuedLeaderSessionID = null;
+ confirmedLeaderSessionID = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Revoke leadership of {}.", leaderContender.getAddress());
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Revoke leadership of {}.", leaderContender.getAddress());
+ }
- leaderContender.revokeLeadership();
+ leaderContender.revokeLeadership();
+ } else {
+ LOG.debug("Ignoring the revoke leadership notification since the service " +
+ "has already been stopped.");
+ }
}
}
@@ -201,53 +249,57 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
// leaderSessionID is null if the leader contender has not yet confirmed the session ID
if (leaderLatch.hasLeadership()) {
synchronized (lock) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Leader node changed while {} is the leader with session ID {}.",
- leaderContender.getAddress(),
- confirmedLeaderSessionID);
- }
-
- if (confirmedLeaderSessionID != null) {
- ChildData childData = cache.getCurrentData();
+ if (running) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Leader node changed while {} is the leader with session ID {}.",
+ leaderContender.getAddress(),
+ confirmedLeaderSessionID);
+ }
- if (childData == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Writing leader information into empty node by {}.",
- leaderContender.getAddress());
- }
- writeLeaderInformation(confirmedLeaderSessionID);
- } else {
- byte[] data = childData.getData();
+ if (confirmedLeaderSessionID != null) {
+ ChildData childData = cache.getCurrentData();
- if (data == null || data.length == 0) {
- // the data field seems to be empty, rewrite information
+ if (childData == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Writing leader information into node with empty data field by {}.",
+ "Writing leader information into empty node by {}.",
leaderContender.getAddress());
}
writeLeaderInformation(confirmedLeaderSessionID);
} else {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- String leaderAddress = ois.readUTF();
- UUID leaderSessionID = (UUID) ois.readObject();
+ byte[] data = childData.getData();
- if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
- (leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
- // the data field does not correspond to the expected leader information
+ if (data == null || data.length == 0) {
+ // the data field seems to be empty, rewrite information
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Correcting leader information by {}.",
+ "Writing leader information into node with empty data field by {}.",
leaderContender.getAddress());
}
writeLeaderInformation(confirmedLeaderSessionID);
+ } else {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+
+ String leaderAddress = ois.readUTF();
+ UUID leaderSessionID = (UUID) ois.readObject();
+
+ if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
+ (leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
+ // the data field does not correspond to the expected leader information
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Correcting leader information by {}.",
+ leaderContender.getAddress());
+ }
+ writeLeaderInformation(confirmedLeaderSessionID);
+ }
}
}
}
+ } else {
+ LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
index f74fb1a..7f18fd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.UUID;
@@ -43,6 +44,8 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
private static final Logger LOG = LoggerFactory.getLogger(
ZooKeeperLeaderRetrievalService.class);
+ private final Object lock = new Object();
+
/** Connection to the used ZooKeeper quorum */
private final CuratorFramework client;
@@ -53,8 +56,11 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
private volatile LeaderRetrievalListener leaderListener;
private String lastLeaderAddress;
+
private UUID lastLeaderSessionID;
+ private volatile boolean running;
+
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
@@ -69,8 +75,14 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
* @param retrievalPath Path of the ZooKeeper node which contains the leader information
*/
public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) {
- this.client = client;
+ this.client = Preconditions.checkNotNull(client, "CuratorFramework client");
this.cache = new NodeCache(client, retrievalPath);
+
+ this.leaderListener = null;
+ this.lastLeaderAddress = null;
+ this.lastLeaderSessionID = null;
+
+ running = false;
}
@Override
@@ -81,65 +93,87 @@ public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService,
LOG.info("Starting ZooKeeperLeaderRetrievalService.");
- leaderListener = listener;
+ synchronized (lock) {
+ leaderListener = listener;
+
+ cache.getListenable().addListener(this);
+ cache.start();
- cache.getListenable().addListener(this);
- cache.start();
+ client.getConnectionStateListenable().addListener(connectionStateListener);
- client.getConnectionStateListenable().addListener(connectionStateListener);
+ running = true;
+ }
}
@Override
public void stop() throws Exception {
LOG.info("Stopping ZooKeeperLeaderRetrievalService.");
+ synchronized (lock) {
+ if (!running) {
+ return;
+ }
+
+ running = false;
+ }
+
client.getConnectionStateListenable().removeListener(connectionStateListener);
- cache.close();
+ try {
+ cache.close();
+ } catch (IOException e) {
+ throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
+ }
}
@Override
public void nodeChanged() throws Exception {
- try {
- LOG.debug("Leader node has changed.");
-
- ChildData childData = cache.getCurrentData();
-
- String leaderAddress;
- UUID leaderSessionID;
-
- if (childData == null) {
- leaderAddress = null;
- leaderSessionID = null;
- } else {
- byte[] data = childData.getData();
-
- if (data == null || data.length == 0) {
- leaderAddress = null;
- leaderSessionID = null;
- } else {
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- ObjectInputStream ois = new ObjectInputStream(bais);
-
- leaderAddress = ois.readUTF();
- leaderSessionID = (UUID) ois.readObject();
+ synchronized (lock) {
+ if (running) {
+ try {
+ LOG.debug("Leader node has changed.");
+
+ ChildData childData = cache.getCurrentData();
+
+ String leaderAddress;
+ UUID leaderSessionID;
+
+ if (childData == null) {
+ leaderAddress = null;
+ leaderSessionID = null;
+ } else {
+ byte[] data = childData.getData();
+
+ if (data == null || data.length == 0) {
+ leaderAddress = null;
+ leaderSessionID = null;
+ } else {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+
+ leaderAddress = ois.readUTF();
+ leaderSessionID = (UUID) ois.readObject();
+ }
+ }
+
+ if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
+ Objects.equals(leaderSessionID, lastLeaderSessionID))) {
+ LOG.debug(
+ "New leader information: Leader={}, session ID={}.",
+ leaderAddress,
+ leaderSessionID);
+
+ lastLeaderAddress = leaderAddress;
+ lastLeaderSessionID = leaderSessionID;
+ leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
+ }
+ } catch (Exception e) {
+ leaderListener.handleError(new Exception("Could not handle node changed event.", e));
+ throw e;
}
+ } else {
+ LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
-
- if(!(Objects.equals(leaderAddress, lastLeaderAddress) &&
- Objects.equals(leaderSessionID, lastLeaderSessionID))) {
- LOG.debug(
- "New leader information: Leader={}, session ID={}.",
- leaderAddress,
- leaderSessionID);
-
- lastLeaderAddress = leaderAddress;
- lastLeaderSessionID = leaderSessionID;
- leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
- }
- } catch (Exception e) {
- leaderListener.handleError(new Exception("Could not handle node changed event.", e));
- throw e;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
new file mode 100644
index 0000000..07178fc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Mini cluster to run the old JobManager code without embedded high availability services. This
+ * class has been implemented because the normal {@link FlinkMiniCluster} has been changed to use
+ * the {@link HighAvailabilityServices}. With this change we can no longer use the
+ * {@link org.apache.flink.api.java.RemoteEnvironment} to connect against the
+ * {@link FlinkMiniCluster}, because the remote environment cannot retrieve the current leader
+ * session id.
+ */
+public class StandaloneMiniCluster {
+
+ private static final String LOCAL_HOSTNAME = "localhost";
+
+ private final Configuration configuration;
+
+ private final ActorSystem actorSystem;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private final HighAvailabilityServices highAvailabilityServices;
+
+ private final FiniteDuration timeout;
+
+ private final int port;
+
+ public StandaloneMiniCluster(Configuration configuration) throws Exception {
+ this.configuration = Preconditions.checkNotNull(configuration);
+
+ timeout = AkkaUtils.getTimeout(configuration);
+
+ actorSystem = JobManager.startActorSystem(
+ configuration,
+ LOCAL_HOSTNAME,
+ 0);
+
+ port = configuration.getInteger(JobManagerOptions.PORT);
+
+ scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+ highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ configuration,
+ Executors.directExecutor(),
+ HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+
+ JobManager.startJobManagerActors(
+ configuration,
+ actorSystem,
+ scheduledExecutorService,
+ scheduledExecutorService,
+ highAvailabilityServices,
+ JobManager.class,
+ MemoryArchivist.class);
+
+ ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(
+ configuration,
+ ResourceID.generate(),
+ actorSystem,
+ highAvailabilityServices,
+ LOCAL_HOSTNAME,
+ Option.<String>empty(),
+ true,
+ TaskManager.class);
+
+ Future<Object> registrationFuture = Patterns.ask(
+ taskManager,
+ TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
+ timeout.toMillis());
+
+ Await.ready(registrationFuture, timeout);
+ }
+
+ public String getHostname() {
+ return LOCAL_HOSTNAME;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public void close() throws Exception {
+ Exception exception = null;
+
+ actorSystem.shutdown();
+ actorSystem.awaitTermination();
+
+ try {
+ highAvailabilityServices.closeAndCleanupAllData();
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ scheduledExecutorService.shutdownNow();
+
+ try {
+ scheduledExecutorService.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index f9cf01d..9b05273 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory;
import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
@@ -38,7 +39,6 @@ import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,14 +102,16 @@ public class QueryableStateClient {
* system and another for the network client.
*
* @param config Configuration to use.
+ * @param highAvailabilityServices Service factory for high availability services
* @throws Exception Failures are forwarded
*/
- public QueryableStateClient(Configuration config) throws Exception {
+ public QueryableStateClient(
+ Configuration config,
+ HighAvailabilityServices highAvailabilityServices) throws Exception {
Preconditions.checkNotNull(config, "Configuration");
// Create a leader retrieval service
- LeaderRetrievalService leaderRetrievalService = LeaderRetrievalUtils
- .createLeaderRetrievalService(config, true);
+ LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
// Get the ask timeout
String askTimeoutString = config.getString(
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index eab4de8..8789eed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -112,10 +112,12 @@ public class AkkaRpcServiceUtils {
/**
*
- * @param hostname The hostname or address where the target RPC service is listening.
- * @param port The port where the target RPC service is listening.
+ * @param hostname The hostname or address where the target RPC service is listening.
+ * @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
- * @param config The configuration from which to deduce further settings.
+ * @param addressResolution Whether to try address resolution of the given hostname or not.
+ * This allows to fail fast in case that the hostname cannot be resolved.
+ * @param config The configuration from which to deduce further settings.
*
* @return The RPC URL of the specified RPC endpoint.
*/
@@ -143,10 +145,12 @@ public class AkkaRpcServiceUtils {
/**
*
- * @param hostname The hostname or address where the target RPC service is listening.
- * @param port The port where the target RPC service is listening.
+ * @param hostname The hostname or address where the target RPC service is listening.
+ * @param port The port where the target RPC service is listening.
* @param endpointName The name of the RPC endpoint.
- * @param akkaProtocol True, if security/encryption is enabled, false otherwise.
+ * @param addressResolution Whether to try address resolution of the given hostname or not.
+ * This allows to fail fast in case that the hostname cannot be resolved.
+ * @param akkaProtocol True, if security/encryption is enabled, false otherwise.
*
* @return The RPC URL of the specified RPC endpoint.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 05749c4..073c52b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,57 +50,6 @@ import java.util.UUID;
public class LeaderRetrievalUtils {
private static final Logger LOG = LoggerFactory.getLogger(LeaderRetrievalUtils.class);
-
- /**
- * Creates a {@link LeaderRetrievalService} based on the provided {@link Configuration} object.
- *
- * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
- * @param resolveInitialHostName If true, resolves the initial hostname
- * @return The {@link LeaderRetrievalService} specified in the configuration object
- * @throws Exception
- */
- public static LeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration,
- boolean resolveInitialHostName)
- throws Exception {
-
- HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
-
- switch (highAvailabilityMode) {
- case NONE:
- return StandaloneUtils.createLeaderRetrievalService(configuration, resolveInitialHostName, null);
- case ZOOKEEPER:
- return ZooKeeperUtils.createLeaderRetrievalService(configuration);
- default:
- throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
- }
- }
-
- /**
- * Creates a {@link LeaderRetrievalService} that either uses the distributed leader election
- * configured in the configuration, or, in standalone mode, the given actor reference.
- *
- * @param configuration Configuration containing the settings for the {@link LeaderRetrievalService}
- * @param standaloneRef Actor reference to be used in standalone mode.
- *
- * @return The {@link LeaderRetrievalService} specified in the configuration object
- * @throws Exception
- */
- public static LeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration, ActorRef standaloneRef) throws Exception {
-
- HighAvailabilityMode highAvailabilityMode = getRecoveryMode(configuration);
-
- switch (highAvailabilityMode) {
- case NONE:
- String akkaUrl = standaloneRef.path().toSerializationFormat();
- return new StandaloneLeaderRetrievalService(akkaUrl);
- case ZOOKEEPER:
- return ZooKeeperUtils.createLeaderRetrievalService(configuration);
- default:
- throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
- }
- }
/**
* Retrieves the current leader gateway using the given {@link LeaderRetrievalService}. If the
@@ -173,6 +121,21 @@ public class LeaderRetrievalUtils {
}
}
+ /**
+ * Retrieves the current leader session id of the component identified by the given leader
+ * retrieval service.
+ *
+ * @param leaderRetrievalService Leader retrieval service to be used for the leader retrieval
+ * @param timeout Timeout for the leader retrieval
+ * @return The leader session id of the retrieved leader
+ * @throws LeaderRetrievalException if the leader retrieval operation fails (including a timeout)
+ */
+ public static UUID retrieveLeaderSessionId(
+ LeaderRetrievalService leaderRetrievalService,
+ FiniteDuration timeout) throws LeaderRetrievalException {
+ return retrieveLeaderConnectionInfo(leaderRetrievalService, timeout).getLeaderSessionID();
+ }
+
public static InetAddress findConnectingAddress(
LeaderRetrievalService leaderRetrievalService,
Time timeout) throws LeaderRetrievalException {
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
index ed485d0..f6f08e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
@@ -135,6 +135,10 @@ public class SerializedThrowable extends Exception implements Serializable {
return cached;
}
+ public String getOriginalErrorClassName() {
+ return originalErrorClassName;
+ }
+
// ------------------------------------------------------------------------
// Override the behavior of Throwable
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ddd6a99a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 1b73dc7..9ade5ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -159,18 +159,6 @@ public class ZooKeeperUtils {
/**
* Creates a {@link ZooKeeperLeaderRetrievalService} instance.
*
- * @param configuration {@link Configuration} object containing the configuration values
- * @return {@link ZooKeeperLeaderRetrievalService} instance.
- */
- public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
- Configuration configuration) throws Exception {
- final CuratorFramework client = startCuratorFramework(configuration);
- return createLeaderRetrievalService(client, configuration);
- }
-
- /**
- * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
- *
* @param client The {@link CuratorFramework} ZooKeeper client to use
* @param configuration {@link Configuration} object containing the configuration values
* @return {@link ZooKeeperLeaderRetrievalService} instance.
@@ -207,21 +195,6 @@ public class ZooKeeperUtils {
}
/**
- * Creates a {@link ZooKeeperLeaderElectionService} instance and a new {@link
- * CuratorFramework} client.
- *
- * @param configuration {@link Configuration} object containing the configuration values
- * @return {@link ZooKeeperLeaderElectionService} instance.
- */
- public static ZooKeeperLeaderElectionService createLeaderElectionService(
- Configuration configuration) throws Exception {
-
- CuratorFramework client = startCuratorFramework(configuration);
-
- return createLeaderElectionService(client, configuration);
- }
-
- /**
* Creates a {@link ZooKeeperLeaderElectionService} instance.
*
* @param client The {@link CuratorFramework} ZooKeeper client to use