You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/02 16:04:56 UTC
[1/2] flink git commit: [FLINK-7851] [scheduling] Improve scheduling
balance by round robin distribution
Repository: flink
Updated Branches:
refs/heads/master ee9027e49 -> 49f690986
[FLINK-7851] [scheduling] Improve scheduling balance by round robin distribution
Make sure that the value maps are of type LinkedHashMap in SlotSharingGroupAssignment#availableSlotsPerJid
This closes #4839.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c669d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c669d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c669d4
Branch: refs/heads/master
Commit: d9c669d4781f095806013651c1a579eae0ca2650
Parents: ee9027e
Author: Till <ti...@gmail.com>
Authored: Mon Oct 16 16:18:23 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Dec 2 16:39:16 2017 +0100
----------------------------------------------------------------------
.../instance/SlotSharingGroupAssignment.java | 46 ++++++-----
.../SlotSharingGroupAssignmentTest.java | 82 ++++++++++++++++++++
2 files changed, 108 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 7618b18..4371290 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -96,7 +96,7 @@ public class SlotSharingGroupAssignment {
private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
/** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */
- private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
+ private final Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
// --------------------------------------------------------------------------------------------
@@ -233,7 +233,7 @@ public class SlotSharingGroupAssignment {
// can place a task into this slot.
boolean entryForNewJidExists = false;
- for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
+ for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
// there is already an entry for this groupID
if (entry.getKey().equals(groupIdForMap)) {
entryForNewJidExists = true;
@@ -246,7 +246,7 @@ public class SlotSharingGroupAssignment {
// make sure an empty entry exists for this group, if no other entry exists
if (!entryForNewJidExists) {
- availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
+ availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<>());
}
return subSlot;
@@ -391,7 +391,7 @@ public class SlotSharingGroupAssignment {
}
// get the available slots for the group
- Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
+ LinkedHashMap<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
if (slotsForGroup == null) {
// we have a new group, so all slots are available
@@ -621,20 +621,26 @@ public class SlotSharingGroupAssignment {
private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>> map) {
Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator();
-
+
while (iter.hasNext()) {
- List<SharedSlot> slots = iter.next().getValue();
-
- if (slots.isEmpty()) {
- iter.remove();
- }
- else if (slots.size() == 1) {
- SharedSlot slot = slots.remove(0);
- iter.remove();
- return slot;
- }
- else {
- return slots.remove(slots.size() - 1);
+ Map.Entry<ResourceID, List<SharedSlot>> slotEntry = iter.next();
+
+ // remove first entry to add it at the back if there are still slots left
+ iter.remove();
+
+ List<SharedSlot> slots = slotEntry.getValue();
+
+ if (!slots.isEmpty()) {
+
+ SharedSlot result = slots.remove(slots.size() - 1);
+
+ if (!slots.isEmpty()) {
+ // reinserts the entry; since it is a LinkedHashMap, we will iterate over this entry
+ // only after having polled from all other entries
+ map.put(slotEntry.getKey(), slots);
+ }
+
+ return result;
}
}
@@ -642,11 +648,11 @@ public class SlotSharingGroupAssignment {
}
private static void removeSlotFromAllEntries(
- Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot slot)
- {
+ Map<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> availableSlots,
+ SharedSlot slot) {
final ResourceID taskManagerId = slot.getTaskManagerID();
- for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) {
+ for (Map.Entry<AbstractID, LinkedHashMap<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) {
Map<ResourceID, List<SharedSlot>> map = entry.getValue();
List<SharedSlot> list = map.get(taskManagerId);
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c669d4/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
new file mode 100644
index 0000000..dca47d3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignmentTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
+public class SlotSharingGroupAssignmentTest extends TestLogger {
+
+ /**
+ * Tests that slots are allocated in a round robin fashion from the set of available resources.
+ */
+ @Test
+ public void testRoundRobinPolling() throws UnknownHostException {
+ final SlotSharingGroupAssignment slotSharingGroupAssignment = new SlotSharingGroupAssignment();
+ final int numberTaskManagers = 2;
+ final int numberSlots = 2;
+ final JobVertexID sourceId = new JobVertexID();
+ final JobVertexID sinkId = new JobVertexID();
+ final JobID jobId = new JobID();
+
+ for (int i = 0; i < numberTaskManagers; i++) {
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(), InetAddress.getLocalHost(), i + 1000);
+
+ for (int j = 0; j < numberSlots; j++) {
+ final SharedSlot slot = new SharedSlot(
+ jobId,
+ mock(SlotOwner.class),
+ taskManagerLocation,
+ j,
+ mock(TaskManagerGateway.class),
+ slotSharingGroupAssignment);
+
+ slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(slot, Locality.UNKNOWN, sourceId);
+ }
+ }
+
+ SimpleSlot allocatedSlot1 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+ SimpleSlot allocatedSlot2 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+
+ assertNotEquals(allocatedSlot1.getTaskManagerLocation(), allocatedSlot2.getTaskManagerLocation());
+
+ // let's check that we can still allocate all 4 slots
+ SimpleSlot allocatedSlot3 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+ assertNotNull(allocatedSlot3);
+
+ SimpleSlot allocatedSlot4 = slotSharingGroupAssignment.getSlotForTask(sinkId, Collections.emptyList());
+ assertNotNull(allocatedSlot4);
+ }
+}
[2/2] flink git commit: [FLINK-8028] Let JobMaster implement
RestfulGateway
Posted by tr...@apache.org.
[FLINK-8028] Let JobMaster implement RestfulGateway
This commit lets the JobMaster implement the RestfulGateway. That way,
the JobMaster can be used in combination with the existing REST handlers.
This closes #4986.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49f69098
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49f69098
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49f69098
Branch: refs/heads/master
Commit: 49f6909868c782bc37116b450dde92e02bc3731b
Parents: d9c669d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 7 14:37:06 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sat Dec 2 16:39:32 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 35 +--------
.../dispatcher/StandaloneDispatcher.java | 3 +-
.../entrypoint/JobClusterEntrypoint.java | 11 ++-
.../runtime/jobmaster/JobManagerRunner.java | 9 ++-
.../flink/runtime/jobmaster/JobMaster.java | 78 +++++++++++++++++++-
.../runtime/jobmaster/JobMasterException.java | 41 ++++++++++
.../runtime/jobmaster/JobMasterGateway.java | 3 +-
.../messages/webmonitor/ClusterOverview.java | 42 +++++++++++
.../flink/runtime/metrics/MetricRegistry.java | 2 -
.../minicluster/MiniClusterJobDispatcher.java | 3 +-
.../resourcemanager/ResourceOverview.java | 6 ++
.../jobmaster/JobManagerRunnerMockTest.java | 3 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 8 +-
13 files changed, 197 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 1fa0f7e..8a26f95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -298,39 +298,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
return allJobsFuture.thenCombine(
taskManagerOverviewFuture,
- (Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) -> {
-
- int numberRunningOrPendingJobs = 0;
- int numberFinishedJobs = 0;
- int numberCancelledJobs = 0;
- int numberFailedJobs = 0;
-
- for (JobStatus status : allJobsStatus) {
- switch (status) {
- case FINISHED:
- numberFinishedJobs++;
- break;
- case FAILED:
- numberFailedJobs++;
- break;
- case CANCELED:
- numberCancelledJobs++;
- break;
- default:
- numberRunningOrPendingJobs++;
- break;
- }
- }
-
- return new ClusterOverview(
- resourceOverview.getNumberTaskManagers(),
- resourceOverview.getNumberRegisteredSlots(),
- resourceOverview.getNumberFreeSlots(),
- numberRunningOrPendingJobs,
- numberFinishedJobs,
- numberCancelledJobs,
- numberFailedJobs);
- });
+ (Collection<JobStatus> allJobsStatus, ResourceOverview resourceOverview) ->
+ ClusterOverview.create(resourceOverview, allJobsStatus));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 3ba681c..c64c883 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -88,6 +88,7 @@ public class StandaloneDispatcher extends Dispatcher {
jobManagerServices,
metricRegistry,
onCompleteActions,
- fatalErrorHandler);
+ fatalErrorHandler,
+ restAddress);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 124c6c6..1c8fb21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -37,6 +37,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
/**
* Base class for per-job cluster entry points.
*/
@@ -80,7 +82,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
jobManagerServices,
heartbeatServices,
metricRegistry,
- this);
+ this,
+ null);
LOG.debug("Starting ResourceManager.");
resourceManager.start();
@@ -97,7 +100,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
JobManagerServices jobManagerServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
- FatalErrorHandler fatalErrorHandler) throws Exception {
+ FatalErrorHandler fatalErrorHandler,
+ @Nullable String restAddress) throws Exception {
JobGraph jobGraph = retrieveJobGraph(configuration);
@@ -111,7 +115,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
jobManagerServices,
metricRegistry,
new TerminatingOnCompleteActions(jobGraph.getJobID()),
- fatalErrorHandler);
+ fatalErrorHandler,
+ restAddress);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index f95b5a0..ed3d43d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -114,7 +116,8 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
final JobManagerServices jobManagerServices,
final MetricRegistry metricRegistry,
final OnCompletionActions toNotifyOnComplete,
- final FatalErrorHandler errorHandler) throws Exception {
+ final FatalErrorHandler errorHandler,
+ @Nullable final String restAddress) throws Exception {
JobManagerMetricGroup jobManagerMetrics = null;
@@ -165,7 +168,9 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
jobManagerMetrics,
this,
this,
- userCodeLoader);
+ userCodeLoader,
+ restAddress,
+ metricRegistry.getMetricQueryServicePath());
this.timeout = jobManagerServices.rpcAskTimeout;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e2fb65f..c4c4445 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -71,9 +71,12 @@ import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
@@ -83,6 +86,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
@@ -107,9 +111,11 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -186,6 +192,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private final SlotPoolGateway slotPoolGateway;
+ private final CompletableFuture<String> restAddressFuture;
+
+ private final String metricQueryServicePath;
+
// --------- ResourceManager --------
/** Leader retriever service used to locate ResourceManager's address */
@@ -215,7 +225,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Nullable JobManagerMetricGroup jobManagerMetricGroup,
OnCompletionActions jobCompletionActions,
FatalErrorHandler errorHandler,
- ClassLoader userCodeLoader) throws Exception {
+ ClassLoader userCodeLoader,
+ @Nullable String restAddress,
+ @Nullable String metricQueryServicePath) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
@@ -296,6 +308,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
this.registeredTaskManagers = new HashMap<>(4);
+
+ this.restAddressFuture = Optional.ofNullable(restAddress)
+ .map(CompletableFuture::completedFuture)
+ .orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint.")));
+
+ this.metricQueryServicePath = metricQueryServicePath;
}
//----------------------------------------------------------------------------------------------
@@ -758,6 +776,64 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
//----------------------------------------------------------------------------------------------
+ // RestfulGateway RPC methods
+ //----------------------------------------------------------------------------------------------
+
+ @Override
+ public CompletableFuture<String> requestRestAddress(Time timeout) {
+ return restAddressFuture;
+ }
+
+ @Override
+ public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
+ if (Objects.equals(jobGraph.getJobID(), jobId)) {
+ return requestArchivedExecutionGraph(timeout);
+ } else {
+ return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+ }
+ }
+
+ @Override
+ public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
+ return requestJobDetails(timeout)
+ .thenApply(
+ jobDetails -> new MultipleJobsDetails(Collections.singleton(jobDetails)));
+ }
+
+ @Override
+ public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
+ final CompletableFuture<ResourceOverview> resourceOverviewFuture;
+ if (resourceManagerConnection != null) {
+ resourceOverviewFuture = resourceManagerConnection.getTargetGateway().requestResourceOverview(timeout);
+ } else {
+ resourceOverviewFuture = CompletableFuture.completedFuture(ResourceOverview.empty());
+ }
+
+ Collection<JobStatus> jobStatuses = Collections.singleton(executionGraph.getState());
+
+ return resourceOverviewFuture.thenApply(
+ (ResourceOverview resourceOverview) -> ClusterOverview.create(resourceOverview, jobStatuses));
+ }
+
+ @Override
+ public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
+ if (metricQueryServicePath != null) {
+ return CompletableFuture.completedFuture(Collections.singleton(metricQueryServicePath));
+ } else {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ }
+
+ @Override
+ public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+ if (resourceManagerConnection != null) {
+ return resourceManagerConnection.getTargetGateway().requestTaskManagerMetricQueryServicePaths(timeout);
+ } else {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ }
+
+ //----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
new file mode 100644
index 0000000..df2fa75
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Base class for {@link JobMaster} related exceptions.
+ */
+public class JobMasterException extends FlinkException {
+
+ private static final long serialVersionUID = 414268039380255248L;
+
+ public JobMasterException(String message) {
+ super(message);
+ }
+
+ public JobMasterException(Throwable cause) {
+ super(cause);
+ }
+
+ public JobMasterException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index f469993..ad906c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
import java.net.InetSocketAddress;
import java.util.Collection;
@@ -55,7 +56,7 @@ import java.util.concurrent.CompletableFuture;
/**
* {@link JobMaster} rpc gateway interface
*/
-public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId> {
+public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId>, RestfulGateway {
/**
* Cancels the currently executed job.
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
index 6913484..69b45b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/ClusterOverview.java
@@ -18,9 +18,15 @@
package org.apache.flink.runtime.messages.webmonitor;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.util.Preconditions;
+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collection;
+
/**
* Response to the {@link RequestStatusOverview} message, carrying a description
* of the Flink cluster status.
@@ -122,4 +128,40 @@ public class ClusterOverview extends JobsOverview {
", numJobsFailed=" + getNumJobsFailed() +
'}';
}
+
+ public static ClusterOverview create(ResourceOverview resourceOverview, Collection<JobStatus> allJobsStatus) {
+ Preconditions.checkNotNull(resourceOverview);
+ Preconditions.checkNotNull(allJobsStatus);
+
+ int numberRunningOrPendingJobs = 0;
+ int numberFinishedJobs = 0;
+ int numberCancelledJobs = 0;
+ int numberFailedJobs = 0;
+
+ for (JobStatus status : allJobsStatus) {
+ switch (status) {
+ case FINISHED:
+ numberFinishedJobs++;
+ break;
+ case FAILED:
+ numberFailedJobs++;
+ break;
+ case CANCELED:
+ numberCancelledJobs++;
+ break;
+ default:
+ numberRunningOrPendingJobs++;
+ break;
+ }
+ }
+
+ return new ClusterOverview(
+ resourceOverview.getNumberTaskManagers(),
+ resourceOverview.getNumberRegisteredSlots(),
+ resourceOverview.getNumberFreeSlots(),
+ numberRunningOrPendingJobs,
+ numberFinishedJobs,
+ numberCancelledJobs,
+ numberFailedJobs);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index e0c2667..782d66a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -27,8 +27,6 @@ import javax.annotation.Nullable;
/**
* Interface for a metric registry.
-
- LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath);
*/
public interface MetricRegistry {
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 60d9a66..3a1474d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -279,7 +279,8 @@ public class MiniClusterJobDispatcher {
jobManagerServices,
metricRegistry,
onCompletion,
- errorHandler);
+ errorHandler,
+ null);
runners[i].start();
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
index 1b3d5ca..ec95759 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceOverview.java
@@ -27,6 +27,8 @@ public class ResourceOverview implements Serializable {
private static final long serialVersionUID = 7618746920569224557L;
+ private static final ResourceOverview EMPTY_RESOURCE_OVERVIEW = new ResourceOverview(0, 0, 0);
+
private final int numberTaskManagers;
private final int numberRegisteredSlots;
@@ -50,4 +52,8 @@ public class ResourceOverview implements Serializable {
public int getNumberFreeSlots() {
return numberFreeSlots;
}
+
+ public static ResourceOverview empty() {
+ return EMPTY_RESOURCE_OVERVIEW;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 083d6e9..f94b4be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -117,7 +117,8 @@ public class JobManagerRunnerMockTest extends TestLogger {
JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
jobCompletion,
- jobCompletion));
+ jobCompletion,
+ null));
}
@After
http://git-wip-us.apache.org/repos/asf/flink/blob/49f69098/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index b52c08c..02047f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -117,7 +117,9 @@ public class JobMasterTest extends TestLogger {
null,
mock(OnCompletionActions.class),
testingFatalErrorHandler,
- FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()));
+ FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
+ null,
+ null);
CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
@@ -224,7 +226,9 @@ public class JobMasterTest extends TestLogger {
null,
mock(OnCompletionActions.class),
testingFatalErrorHandler,
- FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()));
+ FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
+ null,
+ null);
CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);