You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:01 UTC

[flink] branch master updated (32d3737 -> be794ac)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 32d3737  [hotfix][python] Update the package name from pyflink to apache-flink (#9028)
     new 0944531  [FLINK-13067][docs] Fix broken links to contributing docs
     new 85a919f  [hotfix][runtime] Remove unused DependencyVisitor
     new 7f5d583  [FLINK-13057][state] Correct comments in ListState class
     new fa8f931  [hotfix] Remove incorrect doc comments from RocksDBMapState
     new b5f2303  [hotfix][runtime] Fix test cases that use unknown resource profiles in slot offers.
     new d5520a3  [FLINK-12763][runtime] Requests slots with ResourceProfiles that are converted from ResourceSpecs.
     new 973afee  [FLINK-12763][runtime] SlotManager fails unfulfillable slot requests if it is set to do so.
     new 887b8d9  [FLINK-12763][runtime] Yarn/MesosResourceManager do not start new worker when requested resource profile cannot be satisfied.
     new d99877b  [FLINK-12763][runtime] Yarn/MesosResourceManager set SlotManager to fail unfulfillable requests on started.
     new ea032ae  [FLINK-12763][runtime] Introduce a start-up period to standalone cluster that after this period StandaloneResourceManager set SlotManager to fail unfulfillable requests.
     new 15d8cd5  [hotfix] [runtime] Remove obsolete Exception from JobLeaderIdService signature
     new 1a06e71  [hotfix] [tests] Factor out MockResourceManagerRuntimeServices to make them reusable in different tests
     new faacb55  [FLINK-12763][runtime] Use a timeout of zero to indicate no startup period
     new 1d01ed8  [FLINK-12763][runtime] Factor out SlotManager tests for eager failing of unfulfillable requests
     new caf89b1  [FLINK-12763][runtime] Remove eagerly rejected pending slot requests from SlotManager
     new be794ac  [hotfix][runtime] Minor cleanup in SlotManager

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/CONTRIBUTING.md                            |   4 +-
 .github/PULL_REQUEST_TEMPLATE.md                   |   2 +-
 .../generated/resource_manager_configuration.html  |   5 +
 docs/dev/libs/gelly/index.md                       |   2 +-
 docs/dev/libs/gelly/index.zh.md                    |   2 +-
 docs/dev/libs/ml/contribution_guide.md             |   4 +-
 docs/dev/libs/ml/contribution_guide.zh.md          |   4 +-
 docs/dev/projectsetup/java_api_quickstart.md       |   4 +-
 docs/dev/projectsetup/java_api_quickstart.zh.md    |   4 +-
 docs/dev/projectsetup/scala_api_quickstart.md      |   4 +-
 docs/dev/projectsetup/scala_api_quickstart.zh.md   |   4 +-
 docs/dev/table/connect.md                          |   2 +-
 docs/dev/table/connect.zh.md                       |   2 +-
 docs/index.md                                      |   4 +-
 docs/index.zh.md                                   |   4 +-
 docs/internals/components.md                       |   2 +-
 docs/internals/components.zh.md                    |   2 +-
 docs/redirects/example_quickstart.md               |   2 +-
 docs/redirects/filesystems.md                      |   4 +-
 docs/redirects/setup_quickstart.md                 |   2 +-
 docs/redirects/windows.md                          |   2 +-
 .../apache/flink/api/common/state/ListState.java   |  14 +-
 .../flink/configuration/ConfigurationUtils.java    |  11 +
 .../configuration/ResourceManagerOptions.java      |  17 ++
 .../clusterframework/MesosResourceManager.java     |   4 +
 .../clusterframework/types/ResourceProfile.java    |   2 +-
 .../executiongraph/AccessExecutionJobVertex.java   |   8 +
 .../executiongraph/ArchivedExecutionJobVertex.java |  11 +
 .../flink/runtime/executiongraph/Execution.java    |   3 +-
 .../runtime/executiongraph/ExecutionJobVertex.java |   9 +
 .../runtime/executiongraph/ExecutionVertex.java    |   5 +
 .../resourcemanager/JobLeaderIdService.java        |   2 +-
 .../runtime/resourcemanager/ResourceManager.java   |   8 +
 .../resourcemanager/StandaloneResourceManager.java |  21 +-
 .../StandaloneResourceManagerFactory.java          |   7 +-
 .../resourcemanager/slotmanager/SlotManager.java   |  60 +++-
 .../flink/runtime/util/DependencyVisitor.java      | 321 ---------------------
 .../executiongraph/ExecutionGraphRestartTest.java  |   2 +-
 .../jobmanager/scheduler/SchedulerTestBase.java    |   2 +-
 .../StandaloneResourceManagerWithUUIDFactory.java  |   7 +-
 .../resourcemanager/ResourceManagerHATest.java     |   3 +-
 .../ResourceManagerJobMasterTest.java              |   3 +-
 .../ResourceManagerTaskExecutorTest.java           |   3 +-
 .../StandaloneResourceManagerTest.java             | 119 ++++++++
 .../SlotManagerFailUnfulfillableTest.java          | 211 ++++++++++++++
 .../slotmanager/SlotManagerTest.java               |   0
 .../utils/MockResourceManagerRuntimeServices.java  |  78 +++++
 .../SubtaskExecutionAttemptDetailsHandlerTest.java |   2 +
 .../utils/ArchivedExecutionJobVertexBuilder.java   |   2 +
 .../contrib/streaming/state/RocksDBMapState.java   |   4 -
 .../org/apache/flink/yarn/YarnResourceManager.java |   8 +-
 .../apache/flink/yarn/YarnResourceManagerTest.java |  48 +--
 52 files changed, 637 insertions(+), 423 deletions(-)
 mode change 100644 => 100755 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
 mode change 100644 => 100755 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 mode change 100644 => 100755 flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
 create mode 100755 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
 create mode 100755 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
 mode change 100644 => 100755 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 create mode 100755 flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java


[flink] 13/16: [FLINK-12763][runtime] Use a timeout of zero to indicate no startup period

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faacb556544eb8a8e39ca75422efc1547effc6da
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 16:46:18 2019 +0200

    [FLINK-12763][runtime] Use a timeout of zero to indicate no startup period
    
    This allows users to configure a behavior similar to previous versions.
---
 .../flink/configuration/ConfigurationUtils.java    |   2 +-
 .../resourcemanager/StandaloneResourceManager.java |  17 +--
 .../resourcemanager/slotmanager/SlotManager.java   |   4 +
 .../StandaloneResourceManagerTest.java             | 119 +++++++++++++++++++++
 4 files changed, 135 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index d130b80..2fe2f37 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -116,7 +116,7 @@ public class ConfigurationUtils {
 	public static Time getStandaloneClusterStartupPeriodTime(Configuration configuration) {
 		final Time timeout;
 		long standaloneClusterStartupPeriodTime = configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
-		if (standaloneClusterStartupPeriodTime > 0) {
+		if (standaloneClusterStartupPeriodTime >= 0) {
 			timeout = Time.milliseconds(standaloneClusterStartupPeriodTime);
 		} else {
 			timeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
old mode 100644
new mode 100755
index dfbfec1..bc0e8df
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
+	/** The duration of the startup period. A duration of zero means there is no startup period. */
 	private final Time startupPeriodTime;
 
 	public StandaloneResourceManager(
@@ -79,12 +80,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	@Override
 	protected void initialize() throws ResourceManagerException {
-		getRpcService().getScheduledExecutor().schedule(
-			() -> getMainThreadExecutor().execute(
-				() -> setFailUnfulfillableRequest(true)),
-			startupPeriodTime.toMilliseconds(),
-			TimeUnit.MILLISECONDS
-		);
+		final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
+
+		if (startupPeriodMillis > 0) {
+			getRpcService().getScheduledExecutor().schedule(
+				() -> getMainThreadExecutor().execute(
+					() -> setFailUnfulfillableRequest(true)),
+				startupPeriodMillis,
+				TimeUnit.MILLISECONDS
+			);
+		}
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
old mode 100644
new mode 100755
index 320612a..72a77f1
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -192,6 +192,10 @@ public class SlotManager implements AutoCloseable {
 		return pendingSlots.size();
 	}
 
+	public boolean isFailingUnfulfillableRequest() {
+		return failUnfulfillableRequest;
+	}
+
 	@VisibleForTesting
 	int getNumberAssignedPendingTaskManagerSlots() {
 		return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
new file mode 100755
index 0000000..4c78fca
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the Standalone Resource Manager.
+ */
+public class StandaloneResourceManagerTest {
+
+	@ClassRule
+	public static final TestingRpcServiceResource RPC_SERVICE = new TestingRpcServiceResource();
+
+	private static final Time TIMEOUT = Time.seconds(10);
+
+	private final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+	@Test
+	public void testStartupPeriod() throws Exception {
+		final Tuple2<StandaloneResourceManager, SlotManager> managers = createResourceManagerAndSlotManager(Time.milliseconds(1));
+		final StandaloneResourceManager rm = managers.f0;
+		final SlotManager sm = managers.f1;
+
+		assertHappensUntil(sm::isFailingUnfulfillableRequest, Deadline.fromNow(Duration.ofSeconds(10)));
+
+		rm.close();
+	}
+
+	@Test
+	public void testNoStartupPeriod() throws Exception {
+		final Tuple2<StandaloneResourceManager, SlotManager> managers = createResourceManagerAndSlotManager(Time.milliseconds(-1));
+		final StandaloneResourceManager rm = managers.f0;
+		final SlotManager sm = managers.f1;
+
+		// startup includes initialization and granting leadership, so by the time we are
+		// here, the initialization method scheduling the startup period will have been executed.
+
+		assertFalse(fatalErrorHandler.hasExceptionOccurred());
+		assertFalse(sm.isFailingUnfulfillableRequest());
+
+		rm.close();
+	}
+
+	private StandaloneResourceManager createResourceManager(Time startupPeriod) throws Exception {
+		return createResourceManagerAndSlotManager(startupPeriod).f0;
+	}
+
+	private Tuple2<StandaloneResourceManager, SlotManager> createResourceManagerAndSlotManager(
+			Time startupPeriod) throws Exception {
+
+		final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(
+			RPC_SERVICE.getTestingRpcService(),
+			TIMEOUT);
+
+		final StandaloneResourceManager rm = new StandaloneResourceManager(
+			rmServices.rpcService,
+			UUID.randomUUID().toString(),
+			ResourceID.generate(),
+			rmServices.highAvailabilityServices,
+			rmServices.heartbeatServices,
+			rmServices.slotManager,
+			rmServices.metricRegistry,
+			rmServices.jobLeaderIdService,
+			new ClusterInformation("localhost", 1234),
+			fatalErrorHandler,
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			startupPeriod);
+
+		rm.start();
+		rmServices.grantLeadership();
+
+		return new Tuple2<>(rm, rmServices.slotManager);
+	}
+
+	private static void assertHappensUntil(Supplier<Boolean> condition, Deadline until) throws InterruptedException {
+		while (!condition.get()) {
+			if (!until.hasTimeLeft()) {
+				fail("condition was not fulfilled before the deadline");
+			}
+			Thread.sleep(2);
+		}
+	}
+}


[flink] 08/16: [FLINK-12763][runtime] Yarn/MesosResourceManager do not start new worker when requested resource profile cannot be satisfied.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 887b8d97c78ac0ada200ca904719bdd5e59a3bed
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 17:58:42 2019 +0800

    [FLINK-12763][runtime] Yarn/MesosResourceManager do not start new worker when requested resource profile cannot be satisfied.
---
 .../flink/mesos/runtime/clusterframework/MesosResourceManager.java | 3 +++
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java   | 7 +++----
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e4ef99a..6a53935 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -437,6 +437,9 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 
 	@Override
 	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
+		if (!slotsPerWorker.iterator().next().isMatching(resourceProfile)) {
+			return Collections.emptyList();
+		}
 		LOG.info("Starting a new worker.");
 		try {
 			// generate new workers into persistent state and launch associated actors
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index f5e6c99..2e980d9 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -310,11 +310,10 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	@Override
 	public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
-		Preconditions.checkArgument(
-			ResourceProfile.UNKNOWN.equals(resourceProfile),
-			"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
+		if (!slotsPerWorker.iterator().next().isMatching(resourceProfile)) {
+			return Collections.emptyList();
+		}
 		requestYarnContainer();
-
 		return slotsPerWorker;
 	}
 


[flink] 12/16: [hotfix] [tests] Factor out MockResourceManagerRuntimeServices to make them reusable in different tests

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a06e71aaef727fd03a69094bb0ab1efe3fe7e11
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 15:36:08 2019 +0200

    [hotfix] [tests] Factor out MockResourceManagerRuntimeServices to make them reusable in different tests
---
 .../utils/MockResourceManagerRuntimeServices.java  | 78 ++++++++++++++++++++++
 .../apache/flink/yarn/YarnResourceManagerTest.java | 48 +------------
 2 files changed, 80 insertions(+), 46 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
new file mode 100755
index 0000000..0b1a9bf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/MockResourceManagerRuntimeServices.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.utils;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Mock services needed by the resource manager.
+ */
+public class MockResourceManagerRuntimeServices {
+
+	public final RpcService rpcService;
+	public final Time timeout;
+	public final TestingHighAvailabilityServices highAvailabilityServices;
+	public final HeartbeatServices heartbeatServices;
+	public final MetricRegistry metricRegistry;
+	public final TestingLeaderElectionService rmLeaderElectionService;
+	public final JobLeaderIdService jobLeaderIdService;
+	public final SlotManager slotManager;
+
+	public MockResourceManagerRuntimeServices(RpcService rpcService, Time timeout) {
+		this.rpcService = checkNotNull(rpcService);
+		this.timeout = checkNotNull(timeout);
+		highAvailabilityServices = new TestingHighAvailabilityServices();
+		rmLeaderElectionService = new TestingLeaderElectionService();
+		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+		heartbeatServices = new TestingHeartbeatServices();
+		metricRegistry = NoOpMetricRegistry.INSTANCE;
+		slotManager = SlotManagerBuilder.newBuilder()
+			.setScheduledExecutor(new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
+			.setTaskManagerRequestTimeout(Time.seconds(10))
+			.setSlotRequestTimeout(Time.seconds(10))
+			.setTaskManagerTimeout(Time.minutes(1))
+			.build();
+		jobLeaderIdService = new JobLeaderIdService(
+			highAvailabilityServices,
+			rpcService.getScheduledExecutor(),
+			Time.minutes(5L));
+	}
+
+	public void grantLeadership() throws Exception {
+		UUID rmLeaderSessionId = UUID.randomUUID();
+		rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+	}
+}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index df3fc7f..19b2f67 100755
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -30,17 +30,12 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -48,7 +43,7 @@ import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -57,7 +52,6 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.RunnableWithException;
@@ -92,10 +86,8 @@ import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
@@ -270,7 +262,7 @@ public class YarnResourceManagerTest extends TestLogger {
 
 		Context(Configuration configuration) throws  Exception {
 			rpcService = new TestingRpcService();
-			rmServices = new MockResourceManagerRuntimeServices();
+			rmServices = new MockResourceManagerRuntimeServices(rpcService, TIMEOUT);
 
 			// resource manager
 			rmResourceID = ResourceID.generate();
@@ -295,42 +287,6 @@ public class YarnResourceManagerTest extends TestLogger {
 		}
 
 		/**
-		 * Mock services needed by the resource manager.
-		 */
-		class MockResourceManagerRuntimeServices {
-
-			private final TestingHighAvailabilityServices highAvailabilityServices;
-			private final HeartbeatServices heartbeatServices;
-			private final MetricRegistry metricRegistry;
-			private final TestingLeaderElectionService rmLeaderElectionService;
-			private final JobLeaderIdService jobLeaderIdService;
-			private final SlotManager slotManager;
-
-			MockResourceManagerRuntimeServices() throws Exception {
-				highAvailabilityServices = new TestingHighAvailabilityServices();
-				rmLeaderElectionService = new TestingLeaderElectionService();
-				highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-				heartbeatServices = new TestingHeartbeatServices();
-				metricRegistry = NoOpMetricRegistry.INSTANCE;
-				slotManager = SlotManagerBuilder.newBuilder()
-					.setScheduledExecutor(new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()))
-					.setTaskManagerRequestTimeout(Time.seconds(10))
-					.setSlotRequestTimeout(Time.seconds(10))
-					.setTaskManagerTimeout(Time.minutes(1))
-					.build();
-				jobLeaderIdService = new JobLeaderIdService(
-						highAvailabilityServices,
-						rpcService.getScheduledExecutor(),
-						Time.minutes(5L));
-			}
-
-			void grantLeadership() throws Exception {
-				UUID rmLeaderSessionId = UUID.randomUUID();
-				rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
-			}
-		}
-
-		/**
 		 * Start the resource manager and grant leadership to it.
 		 */
 		void startResourceManager() throws Exception {


[flink] 03/16: [FLINK-13057][state] Correct comments in ListState class

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7f5d5838ec2d5c1bceb8f6acf76bb53e04aa3d8f
Author: hequn8128 <ch...@gmail.com>
AuthorDate: Sun Jul 7 21:25:10 2019 +0800

    [FLINK-13057][state] Correct comments in ListState class
    
    This closes #8994
---
 .../java/org/apache/flink/api/common/state/ListState.java  | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 254dc1d..1e2caf6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -27,10 +27,16 @@ import java.util.List;
  * The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
  *
- * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
- * automatically supplied by the system, so the function always sees the value mapped to the
- * key of the current element. That way, the system can handle stream and state partitioning
- * consistently together.
+ * <p>The state can be a keyed list state or an operator list state.
+ *
+ * <p>When it is a keyed list state, it is accessed by functions applied on a {@code KeyedStream}.
+ * The key is automatically supplied by the system, so the function always sees the value mapped
+ * to the key of the current element. That way, the system can handle stream and state
+ * partitioning consistently together.
+ *
+ * <p>When it is an operator list state, the list is a collection of state items that are
+ * independent from each other and eligible for redistribution across operator instances in case
+ * of changed operator parallelism.
  *
  * @param <T> Type of values that this list state keeps.
  */


[flink] 04/16: [hotfix] Remove incorrect doc comments from RocksDBMapState

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fa8f9318cb6b8fe6da1f9f219816733d92c55113
Author: Yun Tang <my...@live.com>
AuthorDate: Fri Jul 5 16:08:42 2019 +0800

    [hotfix] Remove incorrect doc comments from RocksDBMapState
    
    This closes #8999
---
 .../org/apache/flink/contrib/streaming/state/RocksDBMapState.java     | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 4ca935f..64ce823 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -56,10 +56,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 /**
  * {@link MapState} implementation that stores state in RocksDB.
  *
- * <p>{@link RocksDBStateBackend} must ensure that we set the
- * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
- * we use the {@code merge()} call.
- *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <UK> The type of the keys in the map state.


[flink] 10/16: [FLINK-12763][runtime] Introduce a start-up period to standalone cluster that after this period StandaloneResourceManager set SlotManager to fail unfulfillable requests.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea032aef599dc33b3c48edf0d183f65c7ff57492
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 18:36:04 2019 +0800

    [FLINK-12763][runtime] Introduce a start-up period to standalone cluster that after this period StandaloneResourceManager set SlotManager to fail unfulfillable requests.
---
 .../generated/resource_manager_configuration.html       |  5 +++++
 .../apache/flink/configuration/ConfigurationUtils.java  | 11 +++++++++++
 .../flink/configuration/ResourceManagerOptions.java     | 17 +++++++++++++++++
 .../resourcemanager/StandaloneResourceManager.java      | 16 ++++++++++++++--
 .../StandaloneResourceManagerFactory.java               |  7 ++++++-
 .../StandaloneResourceManagerWithUUIDFactory.java       |  7 ++++++-
 .../runtime/resourcemanager/ResourceManagerHATest.java  |  3 ++-
 .../resourcemanager/ResourceManagerJobMasterTest.java   |  3 ++-
 .../ResourceManagerTaskExecutorTest.java                |  3 ++-
 9 files changed, 65 insertions(+), 7 deletions(-)

diff --git a/docs/_includes/generated/resource_manager_configuration.html b/docs/_includes/generated/resource_manager_configuration.html
index 3448aba..0bd1db6 100644
--- a/docs/_includes/generated/resource_manager_configuration.html
+++ b/docs/_includes/generated/resource_manager_configuration.html
@@ -33,6 +33,11 @@
             <td>Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.</td>
         </tr>
         <tr>
+            <td><h5>resourcemanager.standalone.start-up-time</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Time in milliseconds of the start-up period of a standalone cluster. During this time, resource manager of the standalone cluster expects new task executors to be registered, and will not fail slot requests that can not be satisfied by any current registered slots. After this time, it will fail pending and new coming requests immediately that can not be satisfied by registered slots. If not set, 'slotmanager.request-timeout' will be used by default.</td>
+        </tr>
+        <tr>
             <td><h5>resourcemanager.taskmanager-timeout</h5></td>
             <td style="word-wrap: break-word;">30000</td>
             <td>The timeout for an idle task manager to be released.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 7f63353..d130b80 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -113,6 +113,17 @@ public class ConfigurationUtils {
 		return splitPaths(configValue);
 	}
 
+	public static Time getStandaloneClusterStartupPeriodTime(Configuration configuration) {
+		final Time timeout;
+		long standaloneClusterStartupPeriodTime = configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
+		if (standaloneClusterStartupPeriodTime > 0) {
+			timeout = Time.milliseconds(standaloneClusterStartupPeriodTime);
+		} else {
+			timeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
+		}
+		return timeout;
+	}
+
 	/**
 	 * Creates a new {@link Configuration} from the given {@link Properties}.
 	 *
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
index ce82fad..af8e43d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -86,6 +86,23 @@ public class ResourceManagerOptions {
 		.withDescription("The timeout for a slot request to be discarded.");
 
 	/**
+	 * Time in milliseconds of the start-up period of a standalone cluster.
+	 * During this time, resource manager of the standalone cluster expects new task executors to be registered, and
+	 * will not fail slot requests that can not be satisfied by any current registered slots.
+	 * After this time, it will fail pending and new coming requests immediately that can not be satisfied by registered
+	 * slots.
+	 * If not set, {@link #SLOT_REQUEST_TIMEOUT} will be used by default.
+	 */
+	public static final ConfigOption<Long> STANDALONE_CLUSTER_STARTUP_PERIOD_TIME = ConfigOptions
+		.key("resourcemanager.standalone.start-up-time")
+		.defaultValue(-1L)
+		.withDescription("Time in milliseconds of the start-up period of a standalone cluster. During this time, "
+			+ "resource manager of the standalone cluster expects new task executors to be registered, and will not "
+			+ "fail slot requests that can not be satisfied by any current registered slots. After this time, it will "
+			+ "fail pending and new coming requests immediately that can not be satisfied by registered slots. If not "
+			+ "set, 'slotmanager.request-timeout' will be used by default.");
+
+	/**
 	 * The timeout for an idle task manager to be released, in milliseconds.
 	 * @deprecated Use {@link #TASK_MANAGER_TIMEOUT}.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 9b409b3..dfbfec1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -30,11 +31,13 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A standalone implementation of the resource manager. Used when the system is started in
@@ -44,6 +47,8 @@ import java.util.Collections;
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
+	private final Time startupPeriodTime;
+
 	public StandaloneResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -55,7 +60,8 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			JobLeaderIdService jobLeaderIdService,
 			ClusterInformation clusterInformation,
 			FatalErrorHandler fatalErrorHandler,
-			JobManagerMetricGroup jobManagerMetricGroup) {
+			JobManagerMetricGroup jobManagerMetricGroup,
+			Time startupPeriodTime) {
 		super(
 			rpcService,
 			resourceManagerEndpointId,
@@ -68,11 +74,17 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			clusterInformation,
 			fatalErrorHandler,
 			jobManagerMetricGroup);
+		this.startupPeriodTime = Preconditions.checkNotNull(startupPeriodTime);
 	}
 
 	@Override
 	protected void initialize() throws ResourceManagerException {
-		// nothing to initialize
+		getRpcService().getScheduledExecutor().schedule(
+			() -> getMainThreadExecutor().execute(
+				() -> setFailUnfulfillableRequest(true)),
+			startupPeriodTime.toMilliseconds(),
+			TimeUnit.MILLISECONDS
+		);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
index a1c9b8b..26b371f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -54,6 +56,8 @@ public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<R
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor());
 
+		final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
+
 		return new StandaloneResourceManager(
 			rpcService,
 			getEndpointId(),
@@ -65,6 +69,7 @@ public enum StandaloneResourceManagerFactory implements ResourceManagerFactory<R
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			clusterInformation,
 			fatalErrorHandler,
-			jobManagerMetricGroup);
+			jobManagerMetricGroup,
+			standaloneClusterStartupPeriodTime);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/StandaloneResourceManagerWithUUIDFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/StandaloneResourceManagerWithUUIDFactory.java
index 59da157..4719c4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/StandaloneResourceManagerWithUUIDFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/StandaloneResourceManagerWithUUIDFactory.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.minicluster;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -60,6 +62,8 @@ public enum StandaloneResourceManagerWithUUIDFactory implements ResourceManagerF
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor());
 
+		final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);
+
 		return new StandaloneResourceManager(
 			rpcService,
 			generateEndpointIdWithUUID(),
@@ -71,6 +75,7 @@ public enum StandaloneResourceManagerWithUUIDFactory implements ResourceManagerF
 			resourceManagerRuntimeServices.getJobLeaderIdService(),
 			clusterInformation,
 			fatalErrorHandler,
-			jobManagerMetricGroup);
+			jobManagerMetricGroup,
+			standaloneClusterStartupPeriodTime);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index c0b2f3c..08106a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -91,7 +91,8 @@ public class ResourceManagerHATest extends TestLogger {
 				resourceManagerRuntimeServices.getJobLeaderIdService(),
 				new ClusterInformation("localhost", 1234),
 				testingFatalErrorHandler,
-				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup()) {
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+				Time.minutes(5L)) {
 
 				@Override
 				public void revokeLeadership() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index bbcff1e..3002452 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -152,7 +152,8 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			jobLeaderIdService,
 			new ClusterInformation("localhost", 1234),
 			testingFatalErrorHandler,
-			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			Time.minutes(5L));
 
 		resourceManager.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 63d8245..39938b9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -162,7 +162,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				jobLeaderIdService,
 				new ClusterInformation("localhost", 1234),
 				fatalErrorHandler,
-				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+				Time.minutes(5L));
 
 		resourceManager.start();
 


[flink] 14/16: [FLINK-12763][runtime] Factor out SlotManager tests for eager failing of unfulfillable requests

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d01ed8f9e410ed894a1e4be0a96d4c2ce59e748
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 19:21:02 2019 +0200

    [FLINK-12763][runtime] Factor out SlotManager tests for eager failing of unfulfillable requests
    
    Turning the one monolithic test into multile targeted tests makes it easier to test specific
    scenarios and makes the tests easier readable.
---
 .../resourcemanager/slotmanager/SlotManager.java   |   4 +
 .../SlotManagerFailUnfulfillableTest.java          | 211 +++++++++++++++++++++
 .../slotmanager/SlotManagerTest.java               |  59 ------
 3 files changed, 215 insertions(+), 59 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 72a77f1..977c3f4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -192,6 +192,10 @@ public class SlotManager implements AutoCloseable {
 		return pendingSlots.size();
 	}
 
+	public int getNumberPendingSlotRequests() {
+		return pendingSlotRequests.size();
+	}
+
 	public boolean isFailingUnfulfillableRequest() {
 		return failUnfulfillableRequest;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
new file mode 100755
index 0000000..73a081f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for setting the SlotManager to eagerly fail unfulfillable requests.
+ */
+public class SlotManagerFailUnfulfillableTest extends TestLogger {
+
+	@Test
+	public void testTurnOnKeepsPendingFulfillableRequests() throws Exception {
+		// setup
+		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile fulfillableProfile = new ResourceProfile(1.0, 100);
+
+		final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
+		registerFreeSlot(slotManager, availableProfile);
+
+		slotManager.registerSlotRequest(slotRequest(fulfillableProfile));
+		slotManager.registerSlotRequest(slotRequest(fulfillableProfile));
+
+		// test
+		slotManager.setFailUnfulfillableRequest(true);
+
+		// assert
+		assertEquals(1, slotManager.getNumberPendingSlotRequests());
+	}
+
+	@Test
+	public void testTurnOnCancelsPendingUnFulfillableRequests() throws Exception {
+		// setup
+		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile unfulfillableProfile = new ResourceProfile(1.0, 200);
+
+		final List<AllocationID> allocationFailures = new ArrayList<>();
+		final SlotManager slotManager = createSlotManagerNotStartingNewTMs(allocationFailures);
+		registerFreeSlot(slotManager, availableProfile);
+
+		// test
+		final SlotRequest request = slotRequest(unfulfillableProfile);
+		slotManager.registerSlotRequest(request);
+		slotManager.setFailUnfulfillableRequest(true);
+
+		// assert
+		assertEquals(1, allocationFailures.size());
+		assertEquals(request.getAllocationId(), allocationFailures.get(0));
+//		assertEquals(0, slotManager.getNumberPendingSlotRequests()); // BUG, to be fixed in follow-up commit
+	}
+
+	@Test
+	public void testTurnOnKeepsRequestsWithStartingTMs() throws Exception {
+		// setup
+		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile newTmProfile = new ResourceProfile(2.0, 200);
+
+		final SlotManager slotManager = createSlotManagerStartingNewTMs();
+		registerFreeSlot(slotManager, availableProfile);
+
+		// test
+		slotManager.registerSlotRequest(slotRequest(newTmProfile));
+		slotManager.setFailUnfulfillableRequest(true);
+
+		// assert
+		assertEquals(1, slotManager.getNumberPendingSlotRequests());
+	}
+
+	@Test
+	public void testFulfillableRequestsKeepPendingWhenOn() throws Exception {
+		// setup
+		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
+
+		final SlotManager slotManager = createSlotManagerNotStartingNewTMs();
+		registerFreeSlot(slotManager, availableProfile);
+		slotManager.setFailUnfulfillableRequest(true);
+
+		// test
+		slotManager.registerSlotRequest(slotRequest(availableProfile));
+		slotManager.registerSlotRequest(slotRequest(availableProfile));
+
+		// assert
+		assertEquals(1, slotManager.getNumberPendingSlotRequests());
+	}
+
+	@Test
+	public void testUnfulfillableRequestsFailWhenOn() throws Exception {
+		// setup
+		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200);
+
+		final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
+		final SlotManager slotManager = createSlotManagerNotStartingNewTMs(notifiedAllocationFailures);
+		registerFreeSlot(slotManager, availableProfile);
+		slotManager.setFailUnfulfillableRequest(true);
+
+		// test
+		try {
+			slotManager.registerSlotRequest(slotRequest(unfulfillableProfile));
+			fail("this should cause an exception");
+		}
+		catch (SlotManagerException ignored) {}
+
+		// assert
+		assertEquals(0, notifiedAllocationFailures.size());
+		assertEquals(0, slotManager.getNumberPendingSlotRequests());
+	}
+
+	@Test
+	public void testStartingTmKeepsSlotPendingWhenOn() throws Exception {
+		// setup
+		final ResourceProfile availableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile newTmProfile = new ResourceProfile(2.0, 200);
+
+		final SlotManager slotManager = createSlotManagerStartingNewTMs();
+		registerFreeSlot(slotManager, availableProfile);
+		slotManager.setFailUnfulfillableRequest(true);
+
+		// test
+		slotManager.registerSlotRequest(slotRequest(newTmProfile));
+
+		// assert
+		assertEquals(1, slotManager.getNumberPendingSlotRequests());
+	}
+
+	// ------------------------------------------------------------------------
+	//  helper
+	// ------------------------------------------------------------------------
+
+	private static SlotManager createSlotManagerNotStartingNewTMs() {
+		return createSlotManager(new ArrayList<>(), false);
+	}
+
+	private static SlotManager createSlotManagerNotStartingNewTMs(List<AllocationID> notifiedAllocationFailures) {
+		return createSlotManager(notifiedAllocationFailures, false);
+	}
+
+	private static SlotManager createSlotManagerStartingNewTMs() {
+		return createSlotManager(new ArrayList<>(), true);
+	}
+
+	private static SlotManager createSlotManager(
+			List<AllocationID> notifiedAllocationFailures,
+			boolean startNewTMs) {
+
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction((resourceProfile) -> startNewTMs ?
+							Collections.singleton(resourceProfile) :
+							Collections.emptyList())
+			.setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1))
+			.build();
+
+		SlotManager slotManager = SlotManagerBuilder.newBuilder().build();
+		slotManager.start(ResourceManagerId.generate(), Executors.directExecutor(), resourceManagerActions);
+
+		return slotManager;
+	}
+
+	private static void registerFreeSlot(SlotManager slotManager, ResourceProfile slotProfile) {
+		final ResourceID resourceID = ResourceID.generate();
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
+
+		final SlotReport slotReport = new SlotReport(
+			Collections.singleton(new SlotStatus(new SlotID(resourceID, 0), slotProfile)));
+
+		slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+	}
+
+	private static SlotRequest slotRequest(ResourceProfile profile) {
+		return new SlotRequest(new JobID(), new AllocationID(), profile, "foobar");
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
old mode 100644
new mode 100755
index 07427d4..c358866
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -58,7 +58,6 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1487,64 +1486,6 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that SlotManager fails unfulfillable slot requests properly
-	 */
-	@Test
-	public void testFailUnfulfillableSlotRequests() throws Exception {
-		final JobID jobId = new JobID();
-		final ResourceProfile registeredSlotFulfillableProfile = new ResourceProfile(2.0, 100);
-		final ResourceProfile pendingSlotFulfillableProfile = new ResourceProfile(1.0, 200);
-		final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200);
-
-		final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
-		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
-		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
-			.setAllocateResourceFunction((resourceProfile) ->
-				pendingSlotFulfillableProfile.isMatching(resourceProfile) ?
-					Collections.singleton(pendingSlotFulfillableProfile) : Collections.emptyList())
-			.setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1)).build();
-
-		final ResourceID resourceID = ResourceID.generate();
-		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
-		final SlotReport slotReport =
-			new SlotReport(Collections.singleton(new SlotStatus(new SlotID(resourceID, 0), registeredSlotFulfillableProfile)));
-
-		try (final SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
-			slotManager.registerTaskManager(taskExecutorConnection, slotReport);
-
-			// initially, no request should fail
-			SlotRequest slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
-			SlotRequest slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
-			SlotRequest slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
-			assertTrue(slotManager.registerSlotRequest(slotRequest1));
-			assertTrue(slotManager.registerSlotRequest(slotRequest2));
-			assertTrue(slotManager.registerSlotRequest(slotRequest3));
-			assertEquals(0, notifiedAllocationFailures.size());
-
-			// set fail unfulfillable request, pending request 3 should fail
-			slotManager.setFailUnfulfillableRequest(true);
-			assertEquals(1, notifiedAllocationFailures.size());
-			assertEquals(slotRequest3.getAllocationId(), notifiedAllocationFailures.get(0));
-
-			// request again, request 3 should fail
-			slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
-			slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
-			slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
-			assertTrue(slotManager.registerSlotRequest(slotRequest1));
-			assertTrue(slotManager.registerSlotRequest(slotRequest2));
-			Exception exception = null;
-			try {
-				slotManager.registerSlotRequest(slotRequest3);
-			} catch (Exception e) {
-				exception = e;
-			}
-			assertNotNull(exception);
-			assertEquals(1, notifiedAllocationFailures.size());
-		}
-	}
-
 	private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) {
 		return (ResourceProfile resourceProfile) -> {
 			final int slots = function.apply(resourceProfile);


[flink] 02/16: [hotfix][runtime] Remove unused DependencyVisitor

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85a919f298136400f9354f8ddd853f836c6cf155
Author: Aitozi <10...@qq.com>
AuthorDate: Wed Jul 3 18:27:20 2019 +0800

    [hotfix][runtime] Remove unused DependencyVisitor
    
    This closes #8970
---
 .../flink/runtime/util/DependencyVisitor.java      | 321 ---------------------
 1 file changed, 321 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
deleted file mode 100644
index 193ab02..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DependencyVisitor.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.shaded.asm6.org.objectweb.asm.AnnotationVisitor;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.ClassVisitor;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.Opcodes;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.FieldVisitor;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.MethodVisitor;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.Type;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.TypePath;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.Label;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.signature.SignatureReader;
-import org.apache.flink.shaded.asm6.org.objectweb.asm.signature.SignatureVisitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * This class tracks class dependency with ASM visitors.
- * The tutorial could be found here http://asm.ow2.org/doc/tutorial-asm-2.0.html
- *
- */
-public class DependencyVisitor extends ClassVisitor {
-
-	private Set<String> packages = new HashSet<String>();
-
-	private Set<String> nameSpace = new HashSet<String>();
-
-	public Set<String> getPackages() {
-		return packages;
-	}
-
-
-	public DependencyVisitor(int api) {
-		super(api);
-	}
-
-	@Override
-	public void visit(int version, int access, String name, String signature, String superName, String[] interfaces) {
-		if (signature == null) {
-			addInternalName(superName);
-			addInternalNames(interfaces);
-		} else {
-			addSignature(signature);
-		}
-	}
-
-	@Override
-	public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
-		addDesc(desc);
-		return new AnnotationVisitorImpl(Opcodes.ASM6);
-	}
-
-	@Override
-	public FieldVisitor visitField(int access, String name, String desc, String signature, Object value) {
-		if (signature == null) {
-			addDesc(desc);
-		} else {
-			addTypeSignature(signature);
-		}
-		if (value instanceof Type) {
-			addType((Type) value);
-		}
-		return new FieldVisitorImpl(Opcodes.ASM6);
-	}
-
-	@Override
-	public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
-		if (signature == null) {
-			addMethodDesc(desc);
-		} else {
-			addSignature(signature);
-		}
-		addInternalNames(exceptions);
-		return new MethodVisitorImpl(Opcodes.ASM6);
-	}
-
-	// ---------------------------------------------------------------------------------------------
-
-	public void addNameSpace(Set<String> names) {
-		for (String name : names) {
-			this.nameSpace.add(name.replace('.', '/'));
-		}
-	}
-
-	private boolean checkUserDefine(String name) {
-		String[] ns = {};
-		ns = nameSpace.toArray(ns);
-
-		for (String s : ns) {
-			if (name.startsWith(s)) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	private void addName(final String name) {
-		if (checkUserDefine(name)) {
-			packages.add(name);
-		}
-	}
-
-	private void addInternalName(String name) {
-		addType(Type.getObjectType(name));
-	}
-
-	private void addInternalNames(String[] names) {
-		for (int i = 0; names != null && i < names.length; i++) {
-			addInternalName(names[i]);
-		}
-	}
-
-	private void addDesc(String desc) {
-		addType(Type.getType(desc));
-	}
-
-	private void addMethodDesc(String desc) {
-		addType(Type.getReturnType(desc));
-		Type[] types = Type.getArgumentTypes(desc);
-		for (int i = 0; i < types.length; i++) {
-			addType(types[i]);
-		}
-	}
-
-	private void addType(Type t) {
-		switch (t.getSort()) {
-			case Type.ARRAY:
-				addType(t.getElementType());
-				break;
-			case Type.OBJECT:
-				addName(t.getInternalName());
-				break;
-		}
-	}
-
-	private void addSignature(String signature) {
-		if (signature != null) {
-			new SignatureReader(signature).accept(new SignatureVisitorImpl(Opcodes.ASM6));
-		}
-	}
-
-	private void addTypeSignature(String signature) {
-		if (signature != null) {
-			new SignatureReader(signature).acceptType(new SignatureVisitorImpl(Opcodes.ASM6));
-		}
-	}
-
-	public class MethodVisitorImpl extends MethodVisitor {
-
-		public MethodVisitorImpl(int api) {
-			super(api);
-		}
-
-		@Override
-		public AnnotationVisitor visitParameterAnnotation(int parameter, String desc, boolean visible) {
-			addDesc(desc);
-			return new AnnotationVisitorImpl(Opcodes.ASM6);
-		}
-
-		@Override
-		public void visitTypeInsn(int opcode, String type) {
-			addType(Type.getObjectType(type));
-		}
-
-		@Override
-		public void visitFieldInsn(int opcode, String owner, String name, String desc) {
-			addInternalName(owner);
-			addDesc(desc);
-		}
-
-		@Override
-		public void visitMethodInsn(int opcode, String owner, String name, String desc) {
-			addInternalName(owner);
-			addMethodDesc(desc);
-		}
-
-		@Override
-		public void visitMethodInsn(int opcode, String owner, String name, String desc, boolean itf) {
-			addInternalName(owner);
-			addMethodDesc(desc);
-		}
-
-		@Override
-		public void visitLdcInsn(Object cst) {
-			if (cst instanceof Type) {
-				addType((Type) cst);
-			}
-		}
-
-		@Override
-		public void visitMultiANewArrayInsn(String desc, int dims) {
-			addDesc(desc);
-		}
-
-		@Override
-		public void visitLocalVariable(String name, String desc, String signature, Label start, Label end, int index) {
-			if (signature == null) {
-				addDesc(desc);
-			} else {
-				addTypeSignature(signature);
-			}
-		}
-
-		@Override
-		public AnnotationVisitor visitAnnotationDefault() {
-			return new AnnotationVisitorImpl(Opcodes.ASM6);
-		}
-
-		@Override
-		public AnnotationVisitor visitAnnotation(String desc, boolean visible) {
-			addDesc(desc);
-			return new AnnotationVisitorImpl(Opcodes.ASM6);
-		}
-
-		@Override
-		public void visitTryCatchBlock(Label start, Label end, Label handler, String type) {
-			if (type != null) {
-				addInternalName(type);
-			}
-		}
-	}
-
-	public class AnnotationVisitorImpl extends AnnotationVisitor {
-		public AnnotationVisitorImpl(int api) {
-			super(api);
-		}
-
-		@Override
-		public void visit(String name, Object value) {
-			if (value instanceof Type) {
-				addType((Type) value);
-			}
-		}
-
-		@Override
-		public void visitEnum(String name, String desc, String value) {
-			addDesc(desc);
-		}
-
-		@Override
-		public AnnotationVisitor visitAnnotation(String name, String desc) {
-			addDesc(desc);
-			return this;
-		}
-
-		@Override
-		public AnnotationVisitor visitArray(String name) {
-			return this;
-		}
-	}
-
-	public class FieldVisitorImpl extends FieldVisitor {
-
-		public FieldVisitorImpl(int api) {
-			super(api);
-		}
-
-		@Override
-		public AnnotationVisitor visitTypeAnnotation(int typeRef, TypePath typePath, String desc, boolean visible) {
-			addDesc(desc);
-			return new AnnotationVisitorImpl(Opcodes.ASM6);
-		}
-	}
-
-	public class SignatureVisitorImpl extends SignatureVisitor {
-
-		private String signatureClassName;
-
-		private boolean newParameter = false;
-
-		public SignatureVisitorImpl(int api) {
-			super(api);
-		}
-
-		@Override
-		public SignatureVisitor visitParameterType() {
-			this.newParameter = true;
-			return this;
-		}
-
-		@Override
-		public SignatureVisitor visitReturnType() {
-			this.newParameter = true;
-			return this;
-		}
-
-		@Override
-		public void visitClassType(String name) {
-			if (signatureClassName == null || this.newParameter) {
-				signatureClassName = name;
-				newParameter = false;
-			}
-			addInternalName(name);
-		}
-
-		@Override
-		public void visitInnerClassType(String name) {
-			signatureClassName = signatureClassName + "$" + name;
-			addInternalName(signatureClassName);
-		}
-	}
-}


[flink] 15/16: [FLINK-12763][runtime] Remove eagerly rejected pending slot requests from SlotManager

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit caf89b18ad351964af4bf6999dfb21ee3887fc55
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 19:24:17 2019 +0200

    [FLINK-12763][runtime] Remove eagerly rejected pending slot requests from SlotManager
---
 .../apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java   | 1 +
 .../resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java   | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 977c3f4..bea588b 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -488,6 +488,7 @@ public class SlotManager implements AutoCloseable {
 					continue;
 				}
 				if (!isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
+					slotRequestIterator.remove();
 					resourceActions.notifyAllocationFailure(
 						pendingSlotRequest.getJobId(),
 						pendingSlotRequest.getAllocationId(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
index 73a081f..cda07fe 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFailUnfulfillableTest.java
@@ -84,7 +84,7 @@ public class SlotManagerFailUnfulfillableTest extends TestLogger {
 		// assert
 		assertEquals(1, allocationFailures.size());
 		assertEquals(request.getAllocationId(), allocationFailures.get(0));
-//		assertEquals(0, slotManager.getNumberPendingSlotRequests()); // BUG, to be fixed in follow-up commit
+		assertEquals(0, slotManager.getNumberPendingSlotRequests());
 	}
 
 	@Test


[flink] 07/16: [FLINK-12763][runtime] SlotManager fails unfulfillable slot requests if it is set to do so.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 973afee77ea6b22a286889b4d46cc750cd11e617
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 16:50:59 2019 +0800

    [FLINK-12763][runtime] SlotManager fails unfulfillable slot requests if it is set to do so.
---
 .../resourcemanager/slotmanager/SlotManager.java   | 47 +++++++++++++++++
 .../slotmanager/SlotManagerTest.java               | 59 ++++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index d85aec5..320612a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -123,6 +123,14 @@ public class SlotManager implements AutoCloseable {
 	/** Release task executor only when each produced result partition is either consumed or failed. */
 	private final boolean waitResultConsumedBeforeRelease;
 
+	/**
+	 * If true, fail unfulfillable slot requests immediately. Otherwise, allow unfulfillable request to pend.
+	 *
+	 * A slot request is considered unfulfillable if it cannot be fulfilled by neither a slot that is already registered
+	 * (including allocated ones) nor a pending slot that the {@link ResourceActions} can allocate.
+	 * */
+	private boolean failUnfulfillableRequest = false;
+
 	public SlotManager(
 			ScheduledExecutor scheduledExecutor,
 			Time taskManagerRequestTimeout,
@@ -462,6 +470,28 @@ public class SlotManager implements AutoCloseable {
 		}
 	}
 
+	public void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+		if (!this.failUnfulfillableRequest && failUnfulfillableRequest) {
+			// fail unfulfillable pending requests
+			Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();
+			while (slotRequestIterator.hasNext()) {
+				PendingSlotRequest pendingSlotRequest = slotRequestIterator.next().getValue();
+				if (pendingSlotRequest.getAssignedPendingTaskManagerSlot() != null) {
+					continue;
+				}
+				if (!isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
+					resourceActions.notifyAllocationFailure(
+						pendingSlotRequest.getJobId(),
+						pendingSlotRequest.getAllocationId(),
+						new ResourceManagerException("Could not fulfill slot request " + pendingSlotRequest.getAllocationId() + ". "
+							+ "Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.")
+					);
+				}
+			}
+		}
+		this.failUnfulfillableRequest = failUnfulfillableRequest;
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Behaviour methods
 	// ---------------------------------------------------------------------------------------------
@@ -720,6 +750,14 @@ public class SlotManager implements AutoCloseable {
 			}
 
 			pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
+			if (!pendingTaskManagerSlotOptional.isPresent()) {
+				// request can not be fulfilled by any free slot or pending slot that can be allocated,
+				// check whether it can be fulfilled by allocated slots
+				boolean fulfillable = isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile());
+				if (!fulfillable && failUnfulfillableRequest) {
+					throw new ResourceManagerException("Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+				}
+			}
 		}
 	}
 
@@ -733,6 +771,15 @@ public class SlotManager implements AutoCloseable {
 		return Optional.empty();
 	}
 
+	private boolean isFulfillableByRegisteredSlots(ResourceProfile resourceProfile) {
+		for (TaskManagerSlot slot : slots.values()) {
+			if (slot.getResourceProfile().isMatching(resourceProfile)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	private Optional<PendingTaskManagerSlot> allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
 		final Collection<ResourceProfile> requestedSlots = resourceActions.allocateResource(resourceProfile);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index c358866..07427d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -58,6 +58,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1486,6 +1487,64 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that SlotManager fails unfulfillable slot requests properly
+	 */
+	@Test
+	public void testFailUnfulfillableSlotRequests() throws Exception {
+		final JobID jobId = new JobID();
+		final ResourceProfile registeredSlotFulfillableProfile = new ResourceProfile(2.0, 100);
+		final ResourceProfile pendingSlotFulfillableProfile = new ResourceProfile(1.0, 200);
+		final ResourceProfile unfulfillableProfile = new ResourceProfile(2.0, 200);
+
+		final List<AllocationID> notifiedAllocationFailures = new ArrayList<>();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder()
+			.setAllocateResourceFunction((resourceProfile) ->
+				pendingSlotFulfillableProfile.isMatching(resourceProfile) ?
+					Collections.singleton(pendingSlotFulfillableProfile) : Collections.emptyList())
+			.setNotifyAllocationFailureConsumer(tuple3 -> notifiedAllocationFailures.add(tuple3.f1)).build();
+
+		final ResourceID resourceID = ResourceID.generate();
+		final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
+		final SlotReport slotReport =
+			new SlotReport(Collections.singleton(new SlotStatus(new SlotID(resourceID, 0), registeredSlotFulfillableProfile)));
+
+		try (final SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskExecutorConnection, slotReport);
+
+			// initially, no request should fail
+			SlotRequest slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
+			SlotRequest slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
+			SlotRequest slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
+			assertTrue(slotManager.registerSlotRequest(slotRequest2));
+			assertTrue(slotManager.registerSlotRequest(slotRequest3));
+			assertEquals(0, notifiedAllocationFailures.size());
+
+			// set fail unfulfillable request, pending request 3 should fail
+			slotManager.setFailUnfulfillableRequest(true);
+			assertEquals(1, notifiedAllocationFailures.size());
+			assertEquals(slotRequest3.getAllocationId(), notifiedAllocationFailures.get(0));
+
+			// request again, request 3 should fail
+			slotRequest1 = new SlotRequest(jobId, new AllocationID(), registeredSlotFulfillableProfile, "foobar");
+			slotRequest2 = new SlotRequest(jobId, new AllocationID(), pendingSlotFulfillableProfile, "foobar");
+			slotRequest3 = new SlotRequest(jobId, new AllocationID(), unfulfillableProfile, "foobar");
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
+			assertTrue(slotManager.registerSlotRequest(slotRequest2));
+			Exception exception = null;
+			try {
+				slotManager.registerSlotRequest(slotRequest3);
+			} catch (Exception e) {
+				exception = e;
+			}
+			assertNotNull(exception);
+			assertEquals(1, notifiedAllocationFailures.size());
+		}
+	}
+
 	private static FunctionWithException<ResourceProfile, Collection<ResourceProfile>, ResourceManagerException> convert(FunctionWithException<ResourceProfile, Integer, ResourceManagerException> function) {
 		return (ResourceProfile resourceProfile) -> {
 			final int slots = function.apply(resourceProfile);


[flink] 09/16: [FLINK-12763][runtime] Yarn/MesosResourceManager set SlotManager to fail unfulfillable requests on started.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d99877baf701cb5cdf2493465eb7174d8f1c6d4a
Author: Xintong Song <to...@gmail.com>
AuthorDate: Sat Jul 6 18:06:17 2019 +0800

    [FLINK-12763][runtime] Yarn/MesosResourceManager set SlotManager to fail unfulfillable requests on started.
---
 .../mesos/runtime/clusterframework/MesosResourceManager.java      | 1 +
 .../org/apache/flink/runtime/resourcemanager/ResourceManager.java | 8 ++++++++
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java  | 1 +
 3 files changed, 10 insertions(+)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 6a53935..ad0ed58 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -201,6 +201,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 		final ContaineredTaskManagerParameters containeredTaskManagerParameters = taskManagerParameters.containeredParameters();
 		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(
 			flinkConfig, containeredTaskManagerParameters.taskManagerTotalMemoryMB(), containeredTaskManagerParameters.numSlots());
+		setFailUnfulfillableRequest(true);
 	}
 
 	protected ActorRef createSelfActor() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index b845d1d..3ee321f 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1062,6 +1062,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 */
 	public abstract boolean stopWorker(WorkerType worker);
 
+	/**
+	 * Set {@link SlotManager} whether to fail unfulfillable slot requests.
+	 * @param failUnfulfillableRequest whether to fail unfulfillable requests
+	 */
+	protected void setFailUnfulfillableRequest(boolean failUnfulfillableRequest) {
+		slotManager.setFailUnfulfillableRequest(failUnfulfillableRequest);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Static utility classes
 	// ------------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 2e980d9..1bb29cb 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -185,6 +185,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
 		this.slotsPerWorker = updateTaskManagerConfigAndCreateWorkerSlotProfiles(flinkConfig, defaultTaskManagerMemoryMB, numberOfTaskSlots);
+		setFailUnfulfillableRequest(true);
 	}
 
 	protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceManagerClient(


[flink] 06/16: [FLINK-12763][runtime] Requests slots with ResourceProfiles that are converted from ResourceSpecs.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5520a3207fb6ea9a9505a903cf24d0014d75545
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jul 4 23:24:44 2019 +0800

    [FLINK-12763][runtime] Requests slots with ResourceProfiles that are converted from ResourceSpecs.
---
 .../flink/runtime/clusterframework/types/ResourceProfile.java |  2 +-
 .../runtime/executiongraph/AccessExecutionJobVertex.java      |  8 ++++++++
 .../runtime/executiongraph/ArchivedExecutionJobVertex.java    | 11 +++++++++++
 .../org/apache/flink/runtime/executiongraph/Execution.java    |  3 +--
 .../flink/runtime/executiongraph/ExecutionJobVertex.java      |  9 +++++++++
 .../apache/flink/runtime/executiongraph/ExecutionVertex.java  |  5 +++++
 .../job/SubtaskExecutionAttemptDetailsHandlerTest.java        |  2 ++
 .../legacy/utils/ArchivedExecutionJobVertexBuilder.java       |  2 ++
 8 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 21eb9b9..0d9eca3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -325,7 +325,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 			'}';
 	}
 
-	static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
+	public static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) {
 		Map<String, Resource> copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources());
 
 		return new ResourceProfile(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
index 43b5889..523eac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -47,6 +48,13 @@ public interface AccessExecutionJobVertex {
 	int getMaxParallelism();
 
 	/**
+	 * Returns the resource profile for this job vertex.
+	 *
+	 * @return resource profile for this job vertex.
+	 */
+	ResourceProfile getResourceProfile();
+
+	/**
 	 * Returns the {@link JobVertexID} for this job vertex.
 	 *
 	 * @return JobVertexID for this job vertex.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index 6b54760..cc2c7fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -38,6 +39,8 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 
 	private final int maxParallelism;
 
+	private final ResourceProfile resourceProfile;
+
 	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
 
 	public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
@@ -52,6 +55,7 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 		this.name = jobVertex.getJobVertex().getName();
 		this.parallelism = jobVertex.getParallelism();
 		this.maxParallelism = jobVertex.getMaxParallelism();
+		this.resourceProfile = jobVertex.getResourceProfile();
 	}
 
 	public ArchivedExecutionJobVertex(
@@ -60,12 +64,14 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 			String name,
 			int parallelism,
 			int maxParallelism,
+			ResourceProfile resourceProfile,
 			StringifiedAccumulatorResult[] archivedUserAccumulators) {
 		this.taskVertices = taskVertices;
 		this.id = id;
 		this.name = name;
 		this.parallelism = parallelism;
 		this.maxParallelism = maxParallelism;
+		this.resourceProfile = resourceProfile;
 		this.archivedUserAccumulators = archivedUserAccumulators;
 	}
 
@@ -89,6 +95,11 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser
 	}
 
 	@Override
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
+	@Override
 	public JobVertexID getJobVertexId() {
 		return id;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 4939e61..46e1e7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -577,7 +576,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 							slotRequestId,
 							toSchedule,
 							new SlotProfile(
-								ResourceProfile.UNKNOWN,
+								vertex.getResourceProfile(),
 								preferredLocations,
 								previousAllocationIDs,
 								allPreviousExecutionGraphAllocationIds),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 06d597b..ba9b2e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -127,6 +128,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 	private int maxParallelism;
 
+	private final ResourceProfile resourceProfile;
+
 	/**
 	 * Either store a serialized task information, which is for all sub tasks the same,
 	 * or the permanent blob key of the offloaded task information BLOB containing
@@ -193,6 +196,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 
 		this.parallelism = numTaskVertices;
+		this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), 0);
 
 		this.taskVertices = new ExecutionVertex[numTaskVertices];
 		this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
@@ -334,6 +338,11 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return maxParallelism;
 	}
 
+	@Override
+	public ResourceProfile getResourceProfile() {
+		return resourceProfile;
+	}
+
 	public boolean isMaxParallelismConfigured() {
 		return maxParallelismConfigured;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 349aaff..9848c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -227,6 +228,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return this.jobVertex.getMaxParallelism();
 	}
 
+	public ResourceProfile getResourceProfile() {
+		return this.jobVertex.getResourceProfile();
+	}
+
 	@Override
 	public int getParallelSubtaskIndex() {
 		return this.subTaskIndex;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
index ff0ba4a..e452cc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
@@ -108,6 +109,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
 			"test",
 			1,
 			1,
+			ResourceProfile.UNKNOWN,
 			emptyAccumulators);
 
 		// Change some fields so we can make it different from other sub tasks.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
index 814c4db..d8493ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler.legacy.utils;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -78,6 +79,7 @@ public class ArchivedExecutionJobVertexBuilder {
 			name != null ? name : "task_" + RANDOM.nextInt(),
 			parallelism,
 			maxParallelism,
+			ResourceProfile.UNKNOWN,
 			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0]
 		);
 	}


[flink] 16/16: [hotfix][runtime] Minor cleanup in SlotManager

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be794ac078360cf82435dca550f8690ca7abdca2
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 19:32:15 2019 +0200

    [hotfix][runtime] Minor cleanup in SlotManager
    
    Avoid checking whether slots would be fulfillable unless necessary
---
 .../runtime/resourcemanager/slotmanager/SlotManager.java     | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index bea588b..71f3df6 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -758,13 +758,15 @@ public class SlotManager implements AutoCloseable {
 				pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
 			}
 
-			pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
-			if (!pendingTaskManagerSlotOptional.isPresent()) {
+			if (pendingTaskManagerSlotOptional.isPresent()) {
+				assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlotOptional.get());
+			}
+			else {
 				// request can not be fulfilled by any free slot or pending slot that can be allocated,
 				// check whether it can be fulfilled by allocated slots
-				boolean fulfillable = isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile());
-				if (!fulfillable && failUnfulfillableRequest) {
-					throw new ResourceManagerException("Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+				if (failUnfulfillableRequest && !isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
+					throw new ResourceManagerException("Requested resource profile (" +
+						pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
 				}
 			}
 		}


[flink] 05/16: [hotfix][runtime] Fix test cases that use unknown resource profiles in slot offers.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5f23036a42a3e3f1c3588f7d9e8650522688113
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu Jul 4 23:33:17 2019 +0800

    [hotfix][runtime] Fix test cases that use unknown resource profiles in slot offers.
    
    The test cases should not user ResourceProfile#UNKNOWN in slot offers.
      - ResourceProfile#UNKNOWN is used for slot requests whose resource needs are not specified.
      - ResourceProfile#ANY is used for task manager slots whose can match any slot request.
    These cases haven't been failing because so far slot requests always have unknown resource profiles.
---
 .../apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java  | 2 +-
 .../apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 70df983..c5f1f82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -751,7 +751,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		final List<SlotOffer> slotOffers = new ArrayList<>(NUM_TASKS);
 		for (int i = 0; i < numSlots; i++) {
 			final AllocationID allocationId = new AllocationID();
-			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
+			final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY);
 			slotOffers.add(slotOffer);
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 873793f..9c8f2d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -160,7 +160,7 @@ public class SchedulerTestBase extends TestLogger {
 				final SlotOffer slotOffer = new SlotOffer(
 					new AllocationID(),
 					i,
-					ResourceProfile.UNKNOWN);
+					ResourceProfile.ANY);
 
 				slotOffers.add(slotOffer);
 			}


[flink] 01/16: [FLINK-13067][docs] Fix broken links to contributing docs

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09445316bcc991ef262c74063e41af8eaa836ed8
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 2 21:33:04 2019 -0500

    [FLINK-13067][docs] Fix broken links to contributing docs
    
    This closes #8964
---
 .github/CONTRIBUTING.md                          | 4 ++--
 .github/PULL_REQUEST_TEMPLATE.md                 | 2 +-
 docs/dev/libs/gelly/index.md                     | 2 +-
 docs/dev/libs/gelly/index.zh.md                  | 2 +-
 docs/dev/libs/ml/contribution_guide.md           | 4 ++--
 docs/dev/libs/ml/contribution_guide.zh.md        | 4 ++--
 docs/dev/projectsetup/java_api_quickstart.md     | 4 ++--
 docs/dev/projectsetup/java_api_quickstart.zh.md  | 4 ++--
 docs/dev/projectsetup/scala_api_quickstart.md    | 4 ++--
 docs/dev/projectsetup/scala_api_quickstart.zh.md | 4 ++--
 docs/dev/table/connect.md                        | 2 +-
 docs/dev/table/connect.zh.md                     | 2 +-
 docs/index.md                                    | 4 ++--
 docs/index.zh.md                                 | 4 ++--
 docs/internals/components.md                     | 2 +-
 docs/internals/components.zh.md                  | 2 +-
 docs/redirects/example_quickstart.md             | 2 +-
 docs/redirects/filesystems.md                    | 4 ++--
 docs/redirects/setup_quickstart.md               | 2 +-
 docs/redirects/windows.md                        | 2 +-
 20 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md
index ac0b373..be9453e 100644
--- a/.github/CONTRIBUTING.md
+++ b/.github/CONTRIBUTING.md
@@ -6,6 +6,6 @@ To make the process smooth for the project *committers* (those who review and ac
 
 ## Contribution Guidelines
 
-Please check out the [How to Contribute guide](http://flink.apache.org/how-to-contribute.html) to understand how contributions are made. 
-A detailed explanation can be found in our [Contribute Code Guide](http://flink.apache.org/contribute-code.html) which also contains a list of coding guidelines that you should follow.
+Please check out the [How to Contribute guide](http://flink.apache.org/contributing/how-to-contribute.html) to understand how contributions are made. 
+A detailed explanation can be found in our [Contribute Code Guide](http://flink.apache.org/contributing/contribute-code.html) which also contains a list of coding guidelines that you should follow.
 For pull requests, there is a [check list](PULL_REQUEST_TEMPLATE.md) with criteria taken from the How to Contribute Guide and the Coding Guidelines.
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index a017732..5157da5 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -12,7 +12,7 @@
 
   - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
   
-  - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices).
+  - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](https://flink.apache.org/contributing/contribute-code.html#open-a-pull-request).
 
   - Each pull request should address only one issue, not mix up code from multiple issues.
   
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 97b91b8..e1ea26d 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -135,6 +135,6 @@ wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.g
 
 Please submit feature requests and report issues on the user [mailing list](https://flink.apache.org/community.html#mailing-lists)
 or [Flink Jira](https://issues.apache.org/jira/browse/FLINK). We welcome suggestions for new algorithms and features as
-well as [code contributions](https://flink.apache.org/contribute-code.html).
+well as [code contributions](https://flink.apache.org/contributing/contribute-code.html).
 
 {% top %}
diff --git a/docs/dev/libs/gelly/index.zh.md b/docs/dev/libs/gelly/index.zh.md
index 4d69411..d45ac27 100644
--- a/docs/dev/libs/gelly/index.zh.md
+++ b/docs/dev/libs/gelly/index.zh.md
@@ -135,6 +135,6 @@ wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.g
 
 Please submit feature requests and report issues on the user [mailing list](https://flink.apache.org/community.html#mailing-lists)
 or [Flink Jira](https://issues.apache.org/jira/browse/FLINK). We welcome suggestions for new algorithms and features as
-well as [code contributions](https://flink.apache.org/contribute-code.html).
+well as [code contributions](https://flink.apache.org/contributing/contribute-code.html).
 
 {% top %}
diff --git a/docs/dev/libs/ml/contribution_guide.md b/docs/dev/libs/ml/contribution_guide.md
index c28fdaa..65f6b3f 100644
--- a/docs/dev/libs/ml/contribution_guide.md
+++ b/docs/dev/libs/ml/contribution_guide.md
@@ -31,7 +31,7 @@ The following document describes how to contribute to FlinkML.
 
 ## Getting Started
 
-In order to get started first read Flink's [contribution guide](http://flink.apache.org/how-to-contribute.html).
+In order to get started first read Flink's [contribution guide](http://flink.apache.org/contributing/how-to-contribute.html).
 Everything from this guide also applies to FlinkML.
 
 ## Pick a Topic
@@ -103,6 +103,6 @@ See `docs/_include/latex_commands.html` for the complete list of predefined late
 ## Contributing
 
 Once you have implemented the algorithm with adequate test coverage and added documentation, you are ready to open a pull request.
-Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).
+Details of how to open a pull request can be found [here](http://flink.apache.org/contributing/how-to-contribute.html).
 
 {% top %}
diff --git a/docs/dev/libs/ml/contribution_guide.zh.md b/docs/dev/libs/ml/contribution_guide.zh.md
index c28fdaa..65f6b3f 100644
--- a/docs/dev/libs/ml/contribution_guide.zh.md
+++ b/docs/dev/libs/ml/contribution_guide.zh.md
@@ -31,7 +31,7 @@ The following document describes how to contribute to FlinkML.
 
 ## Getting Started
 
-In order to get started first read Flink's [contribution guide](http://flink.apache.org/how-to-contribute.html).
+In order to get started first read Flink's [contribution guide](http://flink.apache.org/contributing/how-to-contribute.html).
 Everything from this guide also applies to FlinkML.
 
 ## Pick a Topic
@@ -103,6 +103,6 @@ See `docs/_include/latex_commands.html` for the complete list of predefined late
 ## Contributing
 
 Once you have implemented the algorithm with adequate test coverage and added documentation, you are ready to open a pull request.
-Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).
+Details of how to open a pull request can be found [here](http://flink.apache.org/contributing/how-to-contribute.html).
 
 {% top %}
diff --git a/docs/dev/projectsetup/java_api_quickstart.md b/docs/dev/projectsetup/java_api_quickstart.md
index fd2eef5..2b27fa0 100644
--- a/docs/dev/projectsetup/java_api_quickstart.md
+++ b/docs/dev/projectsetup/java_api_quickstart.md
@@ -336,7 +336,7 @@ can run the application from the JAR file without additionally specifying the ma
 Write your application!
 
 If you are writing a streaming application and you are looking for inspiration what to write,
-take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/tutorials/datastream_api.html#writing-a-flink-program).
+take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/getting-started/tutorials/datastream_api.html#writing-a-flink-program).
 
 If you are writing a batch processing application and you are looking for inspiration what to write,
 take a look at the [Batch Application Examples]({{ site.baseurl }}/dev/batch/examples.html).
@@ -345,7 +345,7 @@ For a complete overview over the APIs, have a look at the
 [DataStream API]({{ site.baseurl }}/dev/datastream_api.html) and
 [DataSet API]({{ site.baseurl }}/dev/batch/index.html) sections.
 
-[Here]({{ site.baseurl }}/tutorials/local_setup.html) you can find out how to run an application outside the IDE on a local cluster.
+[Here]({{ site.baseurl }}/getting-started/tutorials/local_setup.html) you can find out how to run an application outside the IDE on a local cluster.
 
 If you have any trouble, ask on our
 [Mailing List](http://mail-archives.apache.org/mod_mbox/flink-user/).
diff --git a/docs/dev/projectsetup/java_api_quickstart.zh.md b/docs/dev/projectsetup/java_api_quickstart.zh.md
index 43b5e2d..653fab4 100644
--- a/docs/dev/projectsetup/java_api_quickstart.zh.md
+++ b/docs/dev/projectsetup/java_api_quickstart.zh.md
@@ -323,7 +323,7 @@ __注意:__ 如果你使用其他类而不是 *StreamingJob* 作为应用程
 开始编写应用!
 
 如果你准备编写流处理应用,正在寻找灵感来写什么,
-可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/tutorials/datastream_api.html#writing-a-flink-program)
+可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/getting-started/tutorials/datastream_api.html#writing-a-flink-program)
 
 如果你准备编写批处理应用,正在寻找灵感来写什么,
 可以看看[批处理应用程序示例]({{ site.baseurl }}/zh/dev/batch/examples.html)
@@ -332,7 +332,7 @@ __注意:__ 如果你使用其他类而不是 *StreamingJob* 作为应用程
 [DataStream API]({{ site.baseurl }}/zh/dev/datastream_api.html) 和
 [DataSet API]({{ site.baseurl }}/zh/dev/batch/index.html) 章节。
 
-在[这里]({{ site.baseurl }}/zh/tutorials/local_setup.html),你可以找到如何在 IDE 之外的本地集群中运行应用程序。
+在[这里]({{ site.baseurl }}/zh/getting-started/tutorials/local_setup.html),你可以找到如何在 IDE 之外的本地集群中运行应用程序。
 
 如果你有任何问题,请发信至我们的[邮箱列表](http://mail-archives.apache.org/mod_mbox/flink-user/),我们很乐意提供帮助。
 
diff --git a/docs/dev/projectsetup/scala_api_quickstart.md b/docs/dev/projectsetup/scala_api_quickstart.md
index da38eb7..cd0fc09 100644
--- a/docs/dev/projectsetup/scala_api_quickstart.md
+++ b/docs/dev/projectsetup/scala_api_quickstart.md
@@ -212,7 +212,7 @@ can run time application from the JAR file without additionally specifying the m
 Write your application!
 
 If you are writing a streaming application and you are looking for inspiration what to write,
-take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/tutorials/datastream_api.html#writing-a-flink-program)
+take a look at the [Stream Processing Application Tutorial]({{ site.baseurl }}/getting-started/tutorials/datastream_api.html#writing-a-flink-program)
 
 If you are writing a batch processing application and you are looking for inspiration what to write,
 take a look at the [Batch Application Examples]({{ site.baseurl }}/dev/batch/examples.html)
@@ -221,7 +221,7 @@ For a complete overview over the APIa, have a look at the
 [DataStream API]({{ site.baseurl }}/dev/datastream_api.html) and
 [DataSet API]({{ site.baseurl }}/dev/batch/index.html) sections.
 
-[Here]({{ site.baseurl }}/tutorials/local_setup.html) you can find out how to run an application outside the IDE on a local cluster.
+[Here]({{ site.baseurl }}/getting-started/tutorials/local_setup.html) you can find out how to run an application outside the IDE on a local cluster.
 
 If you have any trouble, ask on our
 [Mailing List](http://mail-archives.apache.org/mod_mbox/flink-user/).
diff --git a/docs/dev/projectsetup/scala_api_quickstart.zh.md b/docs/dev/projectsetup/scala_api_quickstart.zh.md
index 015c011..187f295 100644
--- a/docs/dev/projectsetup/scala_api_quickstart.zh.md
+++ b/docs/dev/projectsetup/scala_api_quickstart.zh.md
@@ -204,7 +204,7 @@ __注意:__ 如果你使用其他类而不是 *StreamingJob* 作为应用程序
 开始编写你的应用!
 
 如果你准备编写流处理应用,正在寻找灵感来写什么,
-可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/tutorials/datastream_api.html#writing-a-flink-program)
+可以看看[流处理应用程序教程]({{ site.baseurl }}/zh/getting-started/tutorials/datastream_api.html#writing-a-flink-program)
 
 如果你准备编写批处理应用,正在寻找灵感来写什么,
 可以看看[批处理应用程序示例]({{ site.baseurl }}/zh/dev/batch/examples.html)
@@ -213,7 +213,7 @@ __注意:__ 如果你使用其他类而不是 *StreamingJob* 作为应用程序
 [DataStream API]({{ site.baseurl }}/zh/dev/datastream_api.html) 和
 [DataSet API]({{ site.baseurl }}/zh/dev/batch/index.html) 部分。
 
-在[这里]({{ site.baseurl }}/zh/tutorials/local_setup.html),你可以找到如何在IDE外的本地集群中运行应用程序。
+在[这里]({{ site.baseurl }}/zh/getting-started/tutorials/local_setup.html),你可以找到如何在IDE外的本地集群中运行应用程序。
 
 如果你有任何问题,请发信至我们的[邮箱列表](http://mail-archives.apache.org/mod_mbox/flink-user/)。
 我们很乐意提供帮助。
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index e365ee6..6e6c08a 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -656,7 +656,7 @@ connector:
 
 The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.
 
-<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems.html).
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
 
 <span class="label label-danger">Attention</span> File system sources and sinks for streaming are only experimental. In the future, we will support actual streaming use cases, i.e., directory monitoring and bucket output.
 
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index a685f72..c8398c2 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -656,7 +656,7 @@ connector:
 
 The file system connector itself is included in Flink and does not require an additional dependency. A corresponding format needs to be specified for reading and writing rows from and to a file system.
 
-<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems.html).
+<span class="label label-danger">Attention</span> Make sure to include [Flink File System specific dependencies]({{ site.baseurl }}/ops/filesystems/index.html).
 
 <span class="label label-danger">Attention</span> File system sources and sinks for streaming are only experimental. In the future, we will support actual streaming use cases, i.e., directory monitoring and bucket output.
 
diff --git a/docs/index.md b/docs/index.md
index 35388ec..b0a74de 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -34,8 +34,8 @@ Apache Flink is an open source platform for distributed stream and batch data pr
 - **Concepts**: Start with the basic concepts of Flink's [Dataflow Programming Model](concepts/programming-model.html) and [Distributed Runtime Environment](concepts/runtime.html). This will help you understand other parts of the documentation, including the setup and programming guides. We recommend you read these sections first.
 
 - **Tutorials**: 
-  * [Implement and run a DataStream application](./tutorials/datastream_api.html)
-  * [Setup a local Flink cluster](./tutorials/local_setup.html)
+  * [Implement and run a DataStream application](./getting-started/tutorials/datastream_api.html)
+  * [Setup a local Flink cluster](./getting-started/tutorials/local_setup.html)
 
 - **Programming Guides**: You can read our guides about [basic API concepts](dev/api_concepts.html) and the [DataStream API](dev/datastream_api.html) or the [DataSet API](dev/batch/index.html) to learn how to write your first Flink programs.
 
diff --git a/docs/index.zh.md b/docs/index.zh.md
index bd2ac40..5fccc1f 100644
--- a/docs/index.zh.md
+++ b/docs/index.zh.md
@@ -32,8 +32,8 @@ Apache Flink 是一个分布式流批一体化的开源平台。Flink 的核心
 
 - **概念**: 从 Flink 的 [数据流编程模型](concepts/programming-model.html) 和 [分布式执行环境](concepts/runtime.html) 开始了解最基本的概念。这能帮助你理解本文档的其他部分,包括如何搭建环境,进行程序编写等。建议你首先阅读此部分。
 - **教程**:
-  * [实现并运行一个 DataStream 作业](./tutorials/datastream_api.html)
-  * [搭建一个本地 Flink 集群](./tutorials/local_setup.html)
+  * [实现并运行一个 DataStream 作业](./getting-started/tutorials/datastream_api.html)
+  * [搭建一个本地 Flink 集群](./getting-started/tutorials/local_setup.html)
 
 - **编程指南**: 你可以从 [基本 API 概念](dev/api_concepts.html), [DataStream API](dev/datastream_api.html) 以及 [DataSet API](dev/batch/index.html) 着手学习如何编写你的第一个 Flink 作业。
 
diff --git a/docs/internals/components.md b/docs/internals/components.md
index d94fcf0..c4e3270 100644
--- a/docs/internals/components.md
+++ b/docs/internals/components.md
@@ -53,7 +53,7 @@ You can click on the components in the figure to learn more.
 <area id="datastream" title="DataStream API" href="{{ site.baseurl }}/dev/datastream_api.html" shape="rect" coords="64,177,379,255" />
 <area id="dataset" title="DataSet API" href="{{ site.baseurl }}/dev/batch/index.html" shape="rect" coords="382,177,697,255" />
 <area id="runtime" title="Runtime" href="{{ site.baseurl }}/concepts/runtime.html" shape="rect" coords="63,257,700,335" />
-<area id="local" title="Local" href="{{ site.baseurl }}/tutorials/local_setup.html" shape="rect" coords="62,337,275,414" />
+<area id="local" title="Local" href="{{ site.baseurl }}/getting-started/tutorials/local_setup.html" shape="rect" coords="62,337,275,414" />
 <area id="cluster" title="Cluster" href="{{ site.baseurl }}/ops/deployment/cluster_setup.html" shape="rect" coords="273,336,486,413" />
 <area id="cloud" title="Cloud" href="{{ site.baseurl }}/ops/deployment/gce_setup.html" shape="rect" coords="485,336,700,414" />
 </map>
diff --git a/docs/internals/components.zh.md b/docs/internals/components.zh.md
index a3b608e..15a0f2c 100644
--- a/docs/internals/components.zh.md
+++ b/docs/internals/components.zh.md
@@ -53,7 +53,7 @@ You can click on the components in the figure to learn more.
 <area id="datastream" title="DataStream API" href="{{ site.baseurl }}/dev/datastream_api.html" shape="rect" coords="64,177,379,255" />
 <area id="dataset" title="DataSet API" href="{{ site.baseurl }}/dev/batch/index.html" shape="rect" coords="382,177,697,255" />
 <area id="runtime" title="Runtime" href="{{ site.baseurl }}/concepts/runtime.html" shape="rect" coords="63,257,700,335" />
-<area id="local" title="Local" href="{{ site.baseurl }}/tutorials/local_setup.html" shape="rect" coords="62,337,275,414" />
+<area id="local" title="Local" href="{{ site.baseurl }}/getting-started/tutorials/local_setup.html" shape="rect" coords="62,337,275,414" />
 <area id="cluster" title="Cluster" href="{{ site.baseurl }}/ops/deployment/cluster_setup.html" shape="rect" coords="273,336,486,413" />
 <area id="cloud" title="Cloud" href="{{ site.baseurl }}/ops/deployment/gce_setup.html" shape="rect" coords="485,336,700,414" />
 </map>
diff --git a/docs/redirects/example_quickstart.md b/docs/redirects/example_quickstart.md
index 8795bed..d2736c4 100644
--- a/docs/redirects/example_quickstart.md
+++ b/docs/redirects/example_quickstart.md
@@ -1,7 +1,7 @@
 ---
 title: "DataStream API Tutorial"
 layout: redirect
-redirect: /tutorials/datastream_api.html
+redirect: /getting-started/tutorials/datastream_api.html
 permalink: /quickstart/run_example_quickstart.html
 ---
 <!--
diff --git a/docs/redirects/filesystems.md b/docs/redirects/filesystems.md
index 4aad652..3cffda3 100644
--- a/docs/redirects/filesystems.md
+++ b/docs/redirects/filesystems.md
@@ -1,8 +1,8 @@
 ---
 title: "File Systems"
 layout: redirect
-redirect: /ops/filesystems.html
-permalink: /ops/filesystems/index.html
+redirect: /ops/filesystems/index.html
+permalink: /ops/filesystems.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/redirects/setup_quickstart.md b/docs/redirects/setup_quickstart.md
index 13da913..70c5f1b 100644
--- a/docs/redirects/setup_quickstart.md
+++ b/docs/redirects/setup_quickstart.md
@@ -1,7 +1,7 @@
 ---
 title: "Local Setup Tutorial"
 layout: redirect
-redirect: /tutorials/local_setup.html
+redirect: /getting-started/tutorials/local_setup.html
 permalink: /quickstart/setup_quickstart.html
 ---
 <!--
diff --git a/docs/redirects/windows.md b/docs/redirects/windows.md
index b769552..415e425 100644
--- a/docs/redirects/windows.md
+++ b/docs/redirects/windows.md
@@ -1,7 +1,7 @@
 ---
 title: "Running Flink on Windows"
 layout: redirect
-redirect: /tutorials/flink_on_windows.html
+redirect: /getting-started/tutorials/flink_on_windows.html
 permalink: /start/flink_on_windows.html
 ---
 <!--


[flink] 11/16: [hotfix] [runtime] Remove obsolete Exception from JobLeaderIdService signature

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 15d8cd5458a7a940cef16760b465abee4f20a06c
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 16:18:42 2019 +0200

    [hotfix] [runtime] Remove obsolete Exception from JobLeaderIdService signature
---
 .../org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
old mode 100644
new mode 100755
index 994db34..98bc66b
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -68,7 +68,7 @@ public class JobLeaderIdService {
 	public JobLeaderIdService(
 			HighAvailabilityServices highAvailabilityServices,
 			ScheduledExecutor scheduledExecutor,
-			Time jobTimeout) throws Exception {
+			Time jobTimeout) {
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
 		this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor");
 		this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");