You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 12:46:05 UTC
[1/6] flink git commit: [hotfix] Add retrieval of key sets to
DualKeyMap
Repository: flink
Updated Branches:
refs/heads/master 51a278778 -> 4afd445b3
[hotfix] Add retrieval of key sets to DualKeyMap
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff670946
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff670946
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff670946
Branch: refs/heads/master
Commit: ff67094671e51679ea79e8e518f18348923feb06
Parents: 057edf9
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 14:11:20 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 12:30:30 2018 +0100
----------------------------------------------------------------------
.../runtime/jobmaster/slotpool/DualKeyMap.java | 9 +++
.../jobmaster/slotpool/DualKeyMapTest.java | 60 ++++++++++++++++++++
2 files changed, 69 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff670946/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
index 04b3ca6..38bcd49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.java
@@ -24,6 +24,7 @@ import java.util.AbstractCollection;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.Set;
/**
* Map which stores values under two different indices.
@@ -125,6 +126,14 @@ public class DualKeyMap<A, B, V> {
return vs;
}
+ public Set<A> keySetA() {
+ return aMap.keySet();
+ }
+
+ public Set<B> keySetB() {
+ return bMap.keySet();
+ }
+
public void clear() {
aMap.clear();
bMap.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/ff670946/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
new file mode 100644
index 0000000..1500d24
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Tests for the {@link DualKeyMap}.
+ */
+public class DualKeyMapTest extends TestLogger {
+
+ @Test
+ public void testKeySets() {
+ final Random random = new Random();
+ final int capacity = 10;
+ final Set<Tuple2<Integer, Integer>> keys = new HashSet<>(capacity);
+
+ for (int i = 0; i < capacity; i++) {
+ int keyA = random.nextInt();
+ int keyB = random.nextInt();
+ keys.add(Tuple2.of(keyA, keyB));
+ }
+
+ final DualKeyMap<Integer, Integer, String> dualKeyMap = new DualKeyMap<>(capacity);
+
+ for (Tuple2<Integer, Integer> key : keys) {
+ dualKeyMap.put(key.f0, key.f1, "foobar");
+ }
+
+ assertThat(dualKeyMap.keySetA(), Matchers.equalTo(keys.stream().map(t -> t.f0).collect(Collectors.toSet())));
+ assertThat(dualKeyMap.keySetB(), Matchers.equalTo(keys.stream().map(t -> t.f1).collect(Collectors.toSet())));
+ }
+}
[2/6] flink git commit: [FLINK-7910] [tests] Generalize
Test(Stream)Environment to use JobExecutor
Posted by tr...@apache.org.
[FLINK-7910] [tests] Generalize Test(Stream)Environment to use JobExecutor
This commit introduces the JobExecutor interface which abstracts the actual mini cluster
from the Test(Stream)Environment. By letting the Flip-6 MiniCluster as well as the
FlinkMiniCluster implement this interface, we can run all test base jobs either on the
Flip-6 mini cluster or on the current mini cluster.
This closes #4897.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/057edf9e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/057edf9e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/057edf9e
Branch: refs/heads/master
Commit: 057edf9e28b656401b985069ebcc428ab55e5fed
Parents: 51a2787
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jan 8 18:23:27 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 12:30:30 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/minicluster/JobExecutor.java | 38 ++++++++++++++++++++
.../flink/runtime/minicluster/MiniCluster.java | 5 +--
.../runtime/minicluster/FlinkMiniCluster.scala | 14 +++++++-
.../runtime/minicluster/MiniClusterITCase.java | 2 +-
.../Flip6LocalStreamEnvironment.java | 2 +-
.../streaming/util/TestStreamEnvironment.java | 31 ++++++++--------
.../apache/flink/test/util/TestEnvironment.java | 36 ++++++++++---------
7 files changed, 91 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java
new file mode 100644
index 0000000..4991c2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Interface for {@link JobGraph} executors.
+ */
+public interface JobExecutor {
+
+ /**
+ * Run the given job and block until its execution result can be returned.
+ *
+ * @param jobGraph to execute
+ * @return Execution result of the executed job
+ * @throws JobExecutionException if the job failed to execute
+ */
+ JobExecutionResult executeJobBlocking(final JobGraph jobGraph) throws JobExecutionException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2bbd2c7..2598a60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -58,7 +58,7 @@ import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
-public class MiniCluster {
+public class MiniCluster implements JobExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
@@ -448,7 +448,8 @@ public class MiniCluster {
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
- public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
+ @Override
+ public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
MiniClusterJobDispatcher dispatcher;
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 5554061..cc8ae5f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -64,7 +64,8 @@ abstract class FlinkMiniCluster(
val userConfiguration: Configuration,
val highAvailabilityServices: HighAvailabilityServices,
val useSingleActorSystem: Boolean)
- extends LeaderRetrievalListener {
+ extends LeaderRetrievalListener
+ with JobExecutor {
protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
@@ -701,4 +702,15 @@ abstract class FlinkMiniCluster(
FlinkUserCodeClassLoaders.parentFirst(urls, parentClassLoader)
}
+
+ /**
+ * Run the given job and block until its execution result can be returned.
+ *
+ * @param jobGraph to execute
+ * @return Execution result of the executed job
+ * @throws JobExecutionException if the job failed to execute
+ */
+ override def executeJobBlocking(jobGraph: JobGraph) = {
+ submitJobAndWait(jobGraph, false)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f822e0c..6a4f678 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -99,7 +99,7 @@ public class MiniClusterITCase extends TestLogger {
private static void executeJob(MiniCluster miniCluster) throws Exception {
JobGraph job = getSimpleJob();
- miniCluster.runJobBlocking(job);
+ miniCluster.executeJobBlocking(job);
}
private static JobGraph getSimpleJob() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 8720e7a..9d4f2a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -107,7 +107,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
try {
miniCluster.start();
- return miniCluster.runJobBlocking(jobGraph);
+ return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 86159a5..61615b8 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.util;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.JobExecutor;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
@@ -37,20 +38,20 @@ import java.util.Collections;
*/
public class TestStreamEnvironment extends StreamExecutionEnvironment {
- /** The mini cluster in which this environment executes its jobs. */
- private final LocalFlinkMiniCluster miniCluster;
+ /** The job executor to use to execute environment's jobs. */
+ private final JobExecutor jobExecutor;
private final Collection<Path> jarFiles;
private final Collection<URL> classPaths;
public TestStreamEnvironment(
- LocalFlinkMiniCluster miniCluster,
+ JobExecutor jobExecutor,
int parallelism,
Collection<Path> jarFiles,
Collection<URL> classPaths) {
- this.miniCluster = Preconditions.checkNotNull(miniCluster);
+ this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
this.jarFiles = Preconditions.checkNotNull(jarFiles);
this.classPaths = Preconditions.checkNotNull(classPaths);
@@ -58,9 +59,9 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
}
public TestStreamEnvironment(
- LocalFlinkMiniCluster miniCluster,
+ JobExecutor jobExecutor,
int parallelism) {
- this(miniCluster, parallelism, Collections.<Path>emptyList(), Collections.<URL>emptyList());
+ this(jobExecutor, parallelism, Collections.emptyList(), Collections.emptyList());
}
@Override
@@ -75,7 +76,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
jobGraph.setClasspaths(new ArrayList<>(classPaths));
- return miniCluster.submitJobAndWait(jobGraph, false);
+ return jobExecutor.executeJobBlocking(jobGraph);
}
// ------------------------------------------------------------------------
@@ -85,13 +86,13 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
* the given cluster with the given default parallelism and the specified jar files and class
* paths.
*
- * @param cluster The test cluster to run the test program on.
+ * @param jobExecutor The executor to execute the jobs on
* @param parallelism The default parallelism for the test programs.
* @param jarFiles Additional jar files to execute the job with
* @param classpaths Additional class paths to execute the job with
*/
public static void setAsContext(
- final LocalFlinkMiniCluster cluster,
+ final JobExecutor jobExecutor,
final int parallelism,
final Collection<Path> jarFiles,
final Collection<URL> classpaths) {
@@ -100,7 +101,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
@Override
public StreamExecutionEnvironment createExecutionEnvironment() {
return new TestStreamEnvironment(
- cluster,
+ jobExecutor,
parallelism,
jarFiles,
classpaths);
@@ -114,15 +115,15 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
* Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
* the given cluster with the given default parallelism.
*
- * @param cluster The test cluster to run the test program on.
+ * @param jobExecutor The executor to execute the jobs on
* @param parallelism The default parallelism for the test programs.
*/
- public static void setAsContext(final LocalFlinkMiniCluster cluster, final int parallelism) {
+ public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
setAsContext(
- cluster,
+ jobExecutor,
parallelism,
- Collections.<Path>emptyList(),
- Collections.<URL>emptyList());
+ Collections.emptyList(),
+ Collections.emptyList());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/057edf9e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index ac5b17d..1d82f87 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
@@ -30,6 +31,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.JobExecutor;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.Preconditions;
@@ -44,7 +46,7 @@ import java.util.Collections;
*/
public class TestEnvironment extends ExecutionEnvironment {
- private final LocalFlinkMiniCluster miniCluster;
+ private final JobExecutor jobExecutor;
private final Collection<Path> jarFiles;
@@ -53,12 +55,12 @@ public class TestEnvironment extends ExecutionEnvironment {
private TestEnvironment lastEnv;
public TestEnvironment(
- LocalFlinkMiniCluster miniCluster,
+ JobExecutor jobExecutor,
int parallelism,
boolean isObjectReuseEnabled,
Collection<Path> jarFiles,
Collection<URL> classPaths) {
- this.miniCluster = Preconditions.checkNotNull(miniCluster);
+ this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
this.jarFiles = Preconditions.checkNotNull(jarFiles);
this.classPaths = Preconditions.checkNotNull(classPaths);
@@ -77,15 +79,15 @@ public class TestEnvironment extends ExecutionEnvironment {
}
public TestEnvironment(
- LocalFlinkMiniCluster executor,
+ JobExecutor executor,
int parallelism,
boolean isObjectReuseEnabled) {
this(
executor,
parallelism,
isObjectReuseEnabled,
- Collections.<Path>emptyList(),
- Collections.<URL>emptyList());
+ Collections.emptyList(),
+ Collections.emptyList());
}
@Override
@@ -115,7 +117,7 @@ public class TestEnvironment extends ExecutionEnvironment {
jobGraph.setClasspaths(new ArrayList<>(classPaths));
- this.lastJobExecutionResult = miniCluster.submitJobAndWait(jobGraph, false);
+ this.lastJobExecutionResult = jobExecutor.executeJobBlocking(jobGraph);
return this.lastJobExecutionResult;
}
@@ -130,7 +132,7 @@ public class TestEnvironment extends ExecutionEnvironment {
private OptimizedPlan compileProgram(String jobName) {
Plan p = createProgramPlan(jobName);
- Optimizer pc = new Optimizer(new DataStatistics(), this.miniCluster.configuration());
+ Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
return pc.compile(p);
}
@@ -138,7 +140,7 @@ public class TestEnvironment extends ExecutionEnvironment {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
- lastEnv = new TestEnvironment(miniCluster, getParallelism(), getConfig().isObjectReuseEnabled());
+ lastEnv = new TestEnvironment(jobExecutor, getParallelism(), getConfig().isObjectReuseEnabled());
return lastEnv;
}
};
@@ -153,13 +155,13 @@ public class TestEnvironment extends ExecutionEnvironment {
* environment executes the given jobs on a Flink mini cluster with the given default
* parallelism and the additional jar files and class paths.
*
- * @param miniCluster The mini cluster on which to execute the jobs
+ * @param jobExecutor The executor to run the jobs on
* @param parallelism The default parallelism
* @param jarFiles Additional jar files to execute the job with
* @param classPaths Additional class paths to execute the job with
*/
public static void setAsContext(
- final LocalFlinkMiniCluster miniCluster,
+ final JobExecutor jobExecutor,
final int parallelism,
final Collection<Path> jarFiles,
final Collection<URL> classPaths) {
@@ -168,7 +170,7 @@ public class TestEnvironment extends ExecutionEnvironment {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
return new TestEnvironment(
- miniCluster,
+ jobExecutor,
parallelism,
false,
jarFiles,
@@ -185,15 +187,15 @@ public class TestEnvironment extends ExecutionEnvironment {
* environment executes the given jobs on a Flink mini cluster with the given default
* parallelism and the additional jar files and class paths.
*
- * @param miniCluster The mini cluster on which to execute the jobs
+ * @param jobExecutor The executor to run the jobs on
* @param parallelism The default parallelism
*/
- public static void setAsContext(final LocalFlinkMiniCluster miniCluster, final int parallelism) {
+ public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
setAsContext(
- miniCluster,
+ jobExecutor,
parallelism,
- Collections.<Path>emptyList(),
- Collections.<URL>emptyList());
+ Collections.emptyList(),
+ Collections.emptyList());
}
public static void unsetAsContext() {
[6/6] flink git commit: [hotfix] Refactor JobMasterTest to avoid
using Mockito
Posted by tr...@apache.org.
[hotfix] Refactor JobMasterTest to avoid using Mockito
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4afd445b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4afd445b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4afd445b
Branch: refs/heads/master
Commit: 4afd445b36f3218d23e1c44e73868a4b12c9be52
Parents: 91f2a8d
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 16:44:59 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:31 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobMasterTest.java | 140 +++++++++++--------
.../utils/TestingResourceManagerGateway.java | 23 +++
.../TestingTaskExecutorGateway.java | 26 +++-
3 files changed, 127 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4afd445b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 1f94419..e4f9fc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,17 +18,19 @@
package org.apache.flink.runtime.jobmaster;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -37,32 +39,27 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import java.net.InetAddress;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import static org.mockito.Mockito.*;
+import static org.hamcrest.MatcherAssert.assertThat;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(BlobLibraryCacheManager.class)
@Category(Flip6.class)
public class JobMasterTest extends TestLogger {
@@ -75,7 +72,7 @@ public class JobMasterTest extends TestLogger {
null,
null);
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
- haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+ haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final String jobManagerAddress = "jm";
@@ -83,9 +80,14 @@ public class JobMasterTest extends TestLogger {
final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
final String taskManagerAddress = "tm";
- final ResourceID tmResourceId = new ResourceID(taskManagerAddress);
- final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(), 1234);
- final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
+
+ final CompletableFuture<ResourceID> heartbeatResourceIdFuture = new CompletableFuture<>();
+ final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
+
+ taskExecutorGateway.setHeartbeatJobManagerConsumer(heartbeatResourceIdFuture::complete);
+ taskExecutorGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
final TestingRpcService rpc = new TestingRpcService();
rpc.registerGateway(taskManagerAddress, taskExecutorGateway);
@@ -93,7 +95,7 @@ public class JobMasterTest extends TestLogger {
final long heartbeatInterval = 1L;
final long heartbeatTimeout = 5L;
- final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+ final ScheduledExecutor scheduledExecutor = rpc.getScheduledExecutor();
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
final JobGraph jobGraph = new JobGraph();
@@ -115,10 +117,10 @@ public class JobMasterTest extends TestLogger {
blobServer,
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]),
- mock(RestartStrategyFactory.class),
+ new NoRestartStrategy.NoRestartStrategyFactory(),
testingTimeout,
null,
- mock(OnCompletionActions.class),
+ new NoOpOnCompletionActions(),
testingFatalErrorHandler,
FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
null,
@@ -138,29 +140,15 @@ public class JobMasterTest extends TestLogger {
// wait for the completion of the registration
registrationResponse.get();
- ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
- heartbeatRunnableCaptor.capture(),
- eq(0L),
- eq(heartbeatInterval),
- eq(TimeUnit.MILLISECONDS));
-
- Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue();
-
- ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutor, timeout(testingTimeout.toMilliseconds())).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
-
- Runnable timeoutRunnable = timeoutRunnableCaptor.getValue();
+ System.out.println("foobar");
- // run the first heartbeat request
- heartbeatRunnable.run();
+ final ResourceID heartbeatResourceId = heartbeatResourceIdFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- verify(taskExecutorGateway, times(1)).heartbeatFromJobManager(eq(jmResourceId));
+ assertThat(heartbeatResourceId, Matchers.equalTo(jmResourceId));
- // run the timeout runnable to simulate a heartbeat timeout
- timeoutRunnable.run();
+ final JobID disconnectedJobManager = disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- verify(taskExecutorGateway, timeout(testingTimeout.toMilliseconds())).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
+ assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
@@ -185,24 +173,33 @@ public class JobMasterTest extends TestLogger {
null,
null);
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
- haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+ haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
+
+ final TestingRpcService rpc = new TestingRpcService();
final long heartbeatInterval = 1L;
final long heartbeatTimeout = 5L;
- final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+ final ScheduledExecutor scheduledExecutor = rpc.getScheduledExecutor();
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
- final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
- when(resourceManagerGateway.registerJobManager(
- any(JobMasterId.class),
- any(ResourceID.class),
- anyString(),
- any(JobID.class),
- any(Time.class)
- )).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess(
- heartbeatInterval, resourceManagerId, rmResourceId)));
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(
+ resourceManagerId,
+ rmResourceId,
+ heartbeatInterval,
+ "localhost",
+ "localhost");
+
+ final CompletableFuture<Tuple3<JobMasterId, ResourceID, JobID>> jobManagerRegistrationFuture = new CompletableFuture<>();
+ final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>();
+
+ resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> jobManagerRegistrationFuture.complete(
+ Tuple3.of(
+ tuple.f0,
+ tuple.f1,
+ tuple.f3)));
+
+ resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0));
- final TestingRpcService rpc = new TestingRpcService();
rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
@@ -224,10 +221,10 @@ public class JobMasterTest extends TestLogger {
blobServer,
FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST,
new String[0]),
- mock(RestartStrategyFactory.class),
+ new NoRestartStrategy.NoRestartStrategyFactory(),
testingTimeout,
null,
- mock(OnCompletionActions.class),
+ new NoOpOnCompletionActions(),
testingFatalErrorHandler,
FlinkUserCodeClassLoaders.parentFirst(new URL[0], JobMasterTest.class.getClassLoader()),
null,
@@ -242,15 +239,18 @@ public class JobMasterTest extends TestLogger {
rmLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerId.toUUID());
// register job manager success will trigger monitor heartbeat target between jm and rm
- verify(resourceManagerGateway, timeout(testingTimeout.toMilliseconds())).registerJobManager(
- eq(jobMasterId),
- eq(jmResourceId),
- anyString(),
- eq(jobGraph.getJobID()),
- any(Time.class));
+ final Tuple3<JobMasterId, ResourceID, JobID> registrationInformation = jobManagerRegistrationFuture.get(
+ testingTimeout.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ assertThat(registrationInformation.f0, Matchers.equalTo(jobMasterId));
+ assertThat(registrationInformation.f1, Matchers.equalTo(jmResourceId));
+ assertThat(registrationInformation.f2, Matchers.equalTo(jobGraph.getJobID()));
+
+ final JobID disconnectedJobManager = disconnectedJobManagerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
// heartbeat timeout should trigger disconnect JobManager from ResourceManager
- verify(resourceManagerGateway, timeout(heartbeatTimeout * 50L)).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class));
+ assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID()));
// check if a concurrent error occurred
testingFatalErrorHandler.rethrowError();
@@ -260,5 +260,25 @@ public class JobMasterTest extends TestLogger {
}
}
+ /**
+ * No op implementation of {@link OnCompletionActions}.
+ */
+ private static final class NoOpOnCompletionActions implements OnCompletionActions {
+
+ @Override
+ public void jobFinished(JobExecutionResult result) {
+
+ }
+
+ @Override
+ public void jobFailed(Throwable cause) {
+
+ }
+
+ @Override
+ public void jobFinishedByOther() {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4afd445b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 0f8b5fa..dc1635a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -68,6 +69,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
private volatile Consumer<SlotRequest> requestSlotConsumer;
+ private volatile Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> registerJobManagerConsumer;
+
+ private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+
public TestingResourceManagerGateway() {
this(
ResourceManagerId.generate(),
@@ -105,8 +110,22 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
this.requestSlotConsumer = slotRequestConsumer;
}
+ public void setRegisterJobManagerConsumer(Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> registerJobManagerConsumer) {
+ this.registerJobManagerConsumer = registerJobManagerConsumer;
+ }
+
+ public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
+ this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+ }
+
@Override
public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
+ final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
+
+ if (currentConsumer != null) {
+ currentConsumer.accept(Tuple4.of(jobMasterId, jobMasterResourceId, jobMasterAddress, jobId));
+ }
+
return CompletableFuture.completedFuture(
new JobMasterRegistrationSuccess(
heartbeatInterval,
@@ -191,7 +210,11 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
+ final Consumer<Tuple2<JobID, Throwable>> currentConsumer = disconnectJobManagerConsumer;
+ if (currentConsumer != null) {
+ currentConsumer.accept(Tuple2.of(jobId, cause));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/4afd445b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index c6334c5..aa6c872 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
/**
* Simple {@link TaskExecutorGateway} implementation for testing purposes.
@@ -43,6 +45,10 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
private final String hostname;
+ private volatile Consumer<ResourceID> heartbeatJobManagerConsumer;
+
+ private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+
public TestingTaskExecutorGateway() {
this("foobar:1234", "foobar");
}
@@ -52,6 +58,14 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
this.hostname = Preconditions.checkNotNull(hostname);
}
+ public void setHeartbeatJobManagerConsumer(Consumer<ResourceID> heartbeatJobManagerConsumer) {
+ this.heartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+ }
+
+ public void setDisconnectJobManagerConsumer(Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer) {
+ this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
+ }
+
@Override
public CompletableFuture<Acknowledge> requestSlot(SlotID slotId, JobID jobId, AllocationID allocationId, String targetAddress, ResourceManagerId resourceManagerId, Time timeout) {
return CompletableFuture.completedFuture(Acknowledge.get());
@@ -94,7 +108,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
@Override
public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
- // noop
+ final Consumer<ResourceID> currentHeartbeatJobManagerConsumer = heartbeatJobManagerConsumer;
+
+ if (currentHeartbeatJobManagerConsumer != null) {
+ currentHeartbeatJobManagerConsumer.accept(heartbeatOrigin);
+ }
}
@Override
@@ -104,7 +122,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
- // noop
+ final Consumer<Tuple2<JobID, Throwable>> currentDisconnectJobManagerConsumer = disconnectJobManagerConsumer;
+
+ if (currentDisconnectJobManagerConsumer != null) {
+ currentDisconnectJobManagerConsumer.accept(Tuple2.of(jobId, cause));
+ }
}
@Override
[4/6] flink git commit: [hotfix] Enable checkpointing RPC calls
Posted by tr...@apache.org.
[hotfix] Enable checkpointing RPC calls
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79854697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79854697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79854697
Branch: refs/heads/master
Commit: 79854697d130b9650b2aa679d263b239a2c10521
Parents: 9541afd
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 15:10:46 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:31 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmaster/RpcTaskManagerGateway.java | 13 ++++++++-----
.../runtime/taskexecutor/TaskExecutorGateway.java | 4 ++--
2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/79854697/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 83b8999..f1d991d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
@@ -37,7 +38,7 @@ import org.apache.flink.util.Preconditions;
import java.util.concurrent.CompletableFuture;
/**
- * Implementation of the {@link TaskManagerGateway} for Flink's RPC system
+ * Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
*/
public class RpcTaskManagerGateway implements TaskManagerGateway {
@@ -112,14 +113,16 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
@Override
public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
-// taskExecutorGateway.notifyCheckpointComplete(executionAttemptID, jobId, checkpointId, timestamp);
- throw new UnsupportedOperationException("Operation is not yet supported.");
+ taskExecutorGateway.confirmCheckpoint(executionAttemptID, checkpointId, timestamp);
}
@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-// taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
- throw new UnsupportedOperationException("Operation is not yet supported.");
+ taskExecutorGateway.triggerCheckpoint(
+ executionAttemptID,
+ checkpointId,
+ timestamp,
+ checkpointOptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/79854697/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 3dc80b6..482d6fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -37,12 +37,12 @@ import org.apache.flink.runtime.taskmanager.Task;
import java.util.concurrent.CompletableFuture;
/**
- * {@link TaskExecutor} RPC gateway interface
+ * {@link TaskExecutor} RPC gateway interface.
*/
public interface TaskExecutorGateway extends RpcGateway {
/**
- * Requests a slot from the TaskManager
+ * Requests a slot from the TaskManager.
*
* @param slotId slot id for the request
* @param jobId for which to request a slot
[5/6] flink git commit: [hotfix] Add JavaDocs to OnCompletionActions
Posted by tr...@apache.org.
[hotfix] Add JavaDocs to OnCompletionActions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91f2a8d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91f2a8d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91f2a8d9
Branch: refs/heads/master
Commit: 91f2a8d9b6bafae5855254977d1f38e1b89f2389
Parents: 7985469
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 09:28:34 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:31 2018 +0100
----------------------------------------------------------------------
.../runtime/jobmanager/OnCompletionActions.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/91f2a8d9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 25a2a66..17167f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -20,11 +20,28 @@ package org.apache.flink.runtime.jobmanager;
import org.apache.flink.api.common.JobExecutionResult;
+/**
+ * Interface for completion actions once a Flink job has reached
+ * a terminal state.
+ */
public interface OnCompletionActions {
+ /**
+ * Job finished successfully.
+ *
+ * @param result of the job execution
+ */
void jobFinished(JobExecutionResult result);
+ /**
+ * Job failed with the given exception.
+ *
+ * @param cause of the job failure
+ */
void jobFailed(Throwable cause);
+ /**
+ * Job was finished by another JobMaster.
+ */
void jobFinishedByOther();
}
[3/6] flink git commit: [FLINK-8389] [flip6] Release all slots upon
closing of JobManager
Posted by tr...@apache.org.
[FLINK-8389] [flip6] Release all slots upon closing of JobManager
This closes #5265.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9541afd2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9541afd2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9541afd2
Branch: refs/heads/master
Commit: 9541afd2c53fc55cee7f0f45b4e16377803f1388
Parents: ff67094
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 15:02:09 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 13:45:19 2018 +0100
----------------------------------------------------------------------
.../slots/ActorTaskManagerGateway.java | 6 +
.../jobmanager/slots/TaskManagerGateway.java | 15 ++
.../flink/runtime/jobmaster/JobMaster.java | 90 +++++++----
.../runtime/jobmaster/JobMasterGateway.java | 3 +-
.../jobmaster/RpcTaskManagerGateway.java | 9 ++
.../runtime/jobmaster/slotpool/SlotPool.java | 126 ++++++++++-----
.../jobmaster/slotpool/SlotPoolGateway.java | 4 +-
.../runtime/taskexecutor/JobLeaderService.java | 3 +-
.../runtime/taskexecutor/TaskExecutor.java | 137 +++++++----------
.../taskexecutor/TaskExecutorGateway.java | 13 ++
.../runtime/taskexecutor/slot/TaskSlot.java | 30 ++--
.../taskexecutor/slot/TaskSlotTable.java | 26 ++--
.../runtime/taskexecutor/slot/TimerService.java | 9 +-
.../ExecutionGraphRestartTest.java | 14 +-
.../utils/SimpleAckingTaskManagerGateway.java | 39 +++--
.../scheduler/DummyScheduledUnit.java | 34 +++++
.../jobmaster/slotpool/SlotPoolTest.java | 152 +++++++++++++------
.../TestingTaskExecutorGateway.java | 5 +
.../src/test/resources/log4j-test.properties | 38 +++++
19 files changed, 498 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index dc5d8c0..6b88752 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -218,6 +219,11 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
return requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
}
+ @Override
+ public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+ throw new UnsupportedOperationException("The old TaskManager does not support freeing slots");
+ }
+
private CompletableFuture<TransientBlobKey> requestTaskManagerLog(TaskManagerMessages.RequestTaskManagerLog request, Time timeout) {
Preconditions.checkNotNull(request);
Preconditions.checkNotNull(timeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 682441a..b2aca32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.StackTrace;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.rpc.RpcTimeout;
import java.util.concurrent.CompletableFuture;
@@ -186,4 +188,17 @@ public interface TaskManagerGateway {
* @return Future blob key under which the task manager stdout file has been stored
*/
CompletableFuture<TransientBlobKey> requestTaskManagerStdout(final Time timeout);
+
+ /**
+ * Frees the slot with the given allocation ID.
+ *
+ * @param allocationId identifying the slot to free
+ * @param cause of the freeing operation
+ * @param timeout for the operation
+ * @return Future acknowledge which is returned once the slot has been freed
+ */
+ CompletableFuture<Acknowledge> freeSlot(
+ final AllocationID allocationId,
+ final Throwable cause,
+ @RpcTimeout final Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7a2844d..b81a8c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -55,8 +55,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -65,6 +63,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
@@ -110,13 +111,16 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -140,8 +144,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
// ------------------------------------------------------------------------
- private final JobMasterGateway selfGateway;
-
private final ResourceID resourceId;
/** Logical representation of the job. */
@@ -226,9 +228,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Nullable String restAddress,
@Nullable String metricQueryServicePath) throws Exception {
- super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
+ super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
- selfGateway = getSelfGateway(JobMasterGateway.class);
+ final JobMasterGateway selfGateway = getSelfGateway(JobMasterGateway.class);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
@@ -362,13 +364,47 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
*/
@Override
public void postStop() throws Exception {
+ log.info("Stopping the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
+
+ // disconnect from all registered TaskExecutors
+ final Set<ResourceID> taskManagerResourceIds = new HashSet<>(registeredTaskManagers.keySet());
+ final FlinkException cause = new FlinkException("Stopping JobMaster for job " + jobGraph.getName() +
+ '(' + jobGraph.getJobID() + ").");
+
+ for (ResourceID taskManagerResourceId : taskManagerResourceIds) {
+ disconnectTaskManager(taskManagerResourceId, cause);
+ }
+
taskManagerHeartbeatManager.stop();
resourceManagerHeartbeatManager.stop();
// make sure there is a graceful exit
suspendExecution(new Exception("JobManager is shutting down."));
- super.postStop();
+ // shut down will internally release all registered slots
+ slotPool.shutDown();
+ CompletableFuture<Boolean> terminationFuture = slotPool.getTerminationFuture();
+
+ Exception exception = null;
+
+ // wait for the slot pool shut down
+ try {
+ terminationFuture.get(rpcTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ try {
+ super.postStop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+
+ log.info("Stopped the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
}
//----------------------------------------------------------------------------------------------
@@ -507,9 +543,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
@Override
- public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
+ public CompletableFuture<Acknowledge> disconnectTaskManager(final ResourceID resourceID, final Exception cause) {
taskManagerHeartbeatManager.unmonitorTarget(resourceID);
- slotPoolGateway.releaseTaskManager(resourceID);
+ CompletableFuture<Acknowledge> releaseFuture = slotPoolGateway.releaseTaskManager(resourceID);
Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID);
@@ -517,6 +553,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
taskManagerConnection.f1.disconnectJobManager(jobGraph.getJobID(), cause);
}
+ return releaseFuture;
}
// TODO: This method needs a leader session ID
@@ -537,14 +574,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
checkpointState);
if (checkpointCoordinator != null) {
- getRpcService().execute(new Runnable() {
- @Override
- public void run() {
- try {
- checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
- } catch (Throwable t) {
- log.warn("Error while processing checkpoint acknowledgement message");
- }
+ getRpcService().execute(() -> {
+ try {
+ checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
+ } catch (Throwable t) {
+ log.warn("Error while processing checkpoint acknowledgement message");
}
});
} else {
@@ -1138,12 +1172,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- resourceManagerResourceID = success.getResourceManagerResourceId();
- establishResourceManagerConnection(success);
- }
+ runAsync(() -> {
+ resourceManagerResourceID = success.getResourceManagerResourceId();
+ establishResourceManagerConnection(success);
});
}
@@ -1209,15 +1240,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+ runAsync(() -> {
+ log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
- closeResourceManagerConnection(
- new TimeoutException(
- "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
- }
+ closeResourceManagerConnection(
+ new TimeoutException(
+ "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 09d995e..0d896ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -131,8 +131,9 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
*
* @param resourceID identifying the TaskManager to disconnect
* @param cause for the disconnection of the TaskManager
+ * @return Future acknowledge once the JobMaster has been disconnected from the TaskManager
*/
- void disconnectTaskManager(ResourceID resourceID, Exception cause);
+ CompletableFuture<Acknowledge> disconnectTaskManager(ResourceID resourceID, Exception cause);
/**
* Disconnects the resource manager from the job manager because of the given cause.
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 849a163..83b8999 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -132,4 +133,12 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
// return taskExecutorGateway.requestTaskManagerStdout(timeout);
throw new UnsupportedOperationException("Operation is not yet supported.");
}
+
+ @Override
+ public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+ return taskExecutorGateway.freeSlot(
+ allocationId,
+ cause,
+ timeout);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 996e445..a56335c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -51,9 +51,6 @@ import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.util.Collection;
@@ -89,16 +86,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedSlotActions {
- /** The log for the pool - shared also with the internal classes. */
- static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
-
// ------------------------------------------------------------------------
private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
- private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+ private static final Time DEFAULT_TIMEOUT = Time.seconds(10);
// ------------------------------------------------------------------------
@@ -121,8 +115,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
/** The requests that are waiting for the resource manager to be connected. */
private final HashMap<SlotRequestId, PendingRequest> waitingForResourceManager;
- /** Timeout for request calls to the ResourceManager. */
- private final Time resourceManagerRequestsTimeout;
+ /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */
+ private final Time timeout;
/** Timeout for allocation round trips (RM -> launch TM -> offer slot). */
private final Time resourceManagerAllocationTimeout;
@@ -143,8 +137,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
// ------------------------------------------------------------------------
public SlotPool(RpcService rpcService, JobID jobId) {
- this(rpcService, jobId, SystemClock.getInstance(),
- DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+ this(
+ rpcService,
+ jobId,
+ SystemClock.getInstance(),
+ DEFAULT_SLOT_REQUEST_TIMEOUT,
+ DEFAULT_RM_ALLOCATION_TIMEOUT,
+ DEFAULT_TIMEOUT);
}
public SlotPool(
@@ -159,10 +158,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
this.jobId = checkNotNull(jobId);
this.clock = checkNotNull(clock);
- this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
+ this.timeout = checkNotNull(resourceManagerRequestTimeout);
this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
- this.registeredTaskManagers = new HashSet<>();
+ this.registeredTaskManagers = new HashSet<>(16);
this.allocatedSlots = new AllocatedSlots();
this.availableSlots = new AvailableSlots();
this.pendingRequests = new DualKeyMap<>(16);
@@ -204,6 +203,25 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
}
}
+ @Override
+ public void postStop() throws Exception {
+ // cancel all pending allocations
+ Set<AllocationID> allocationIds = pendingRequests.keySetB();
+
+ for (AllocationID allocationId : allocationIds) {
+ resourceManagerGateway.cancelSlotRequest(allocationId);
+ }
+
+ // release all registered slots by releasing the corresponding TaskExecutors
+ for (ResourceID taskManagerResourceId : registeredTaskManagers) {
+ releaseTaskManagerInternal(taskManagerResourceId);
+ }
+
+ clear();
+
+ super.postStop();
+ }
+
/**
* Suspends this pool, meaning it has lost its authority to accept and distribute slots.
*/
@@ -220,9 +238,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
// Clear (but not release!) the available slots. The TaskManagers should re-register them
// at the new leader JobManager/SlotPool
- availableSlots.clear();
- allocatedSlots.clear();
- pendingRequests.clear();
+ clear();
}
// ------------------------------------------------------------------------
@@ -644,7 +660,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
Preconditions.checkNotNull(resourceManagerGateway);
Preconditions.checkNotNull(pendingRequest);
- LOG.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
+ log.info("Requesting slot with profile {} from resource manager (request = {}).", pendingRequest.getResourceProfile(), pendingRequest.getSlotRequestId());
final AllocationID allocationId = new AllocationID();
@@ -660,7 +676,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
- resourceManagerRequestsTimeout);
+ timeout);
CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync(
(Acknowledge value) -> {
@@ -695,8 +711,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
request.getAllocatedSlotFuture().completeExceptionally(new NoResourceAvailableException(
"No pooled slot available and request to ResourceManager for new slot failed", failure));
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unregistered slot request {} failed.", slotRequestID, failure);
+ if (log.isDebugEnabled()) {
+ log.debug("Unregistered slot request {} failed.", slotRequestID, failure);
}
}
}
@@ -710,7 +726,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
private void stashRequestWaitingForResourceManager(final PendingRequest pendingRequest) {
- LOG.info("Cannot serve slot request, no ResourceManager connected. " +
+ log.info("Cannot serve slot request, no ResourceManager connected. " +
"Adding as pending request {}", pendingRequest.getSlotRequestId());
waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest);
@@ -720,7 +736,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
public void run() {
checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId());
}
- }, resourceManagerRequestsTimeout);
+ }, timeout);
}
private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequestId) {
@@ -802,7 +818,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
Preconditions.checkNotNull(e);
if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
- LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
+ log.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
}
}
@@ -833,13 +849,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
if (pendingRequest != null) {
- LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+ log.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
- LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
+ log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
}
@@ -932,14 +948,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
final AllocationID allocationID = slotOffer.getAllocationId();
if (!registeredTaskManagers.contains(resourceID)) {
- LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
+ log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
slotOffer.getAllocationId(), taskManagerLocation);
return CompletableFuture.completedFuture(false);
}
// check whether we have already using this slot
if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
- LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
+ log.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
// return true here so that the sender will get a positive acknowledgement to the retry
// and mark the offering as a success
@@ -1003,7 +1019,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
failPendingRequest(pendingRequest, cause);
}
else if (availableSlots.tryRemove(allocationID)) {
- LOG.debug("Failed available slot [{}] with ", allocationID, cause);
+ log.debug("Failed available slot [{}] with ", allocationID, cause);
}
else {
AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
@@ -1013,7 +1029,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
allocatedSlot.releasePayload(cause);
}
else {
- LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
+ log.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
}
}
// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
@@ -1041,21 +1057,48 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
* Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
* when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.
*
- * @param resourceID The id of the TaskManager
+ * @param resourceId The id of the TaskManager
*/
@Override
- public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceID) {
- if (registeredTaskManagers.remove(resourceID)) {
- availableSlots.removeAllForTaskManager(resourceID);
+ public CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId) {
+ if (registeredTaskManagers.remove(resourceId)) {
+ releaseTaskManagerInternal(resourceId);
+ }
- final Set<AllocatedSlot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
- for (AllocatedSlot allocatedSlot : allocatedSlotsForResource) {
- allocatedSlot.releasePayload(new FlinkException("TaskManager " + resourceID + " was released."));
- }
+ // ------------------------------------------------------------------------
+ // Internal methods
+ // ------------------------------------------------------------------------
+
+ private void releaseTaskManagerInternal(final ResourceID resourceId) {
+ final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.');
+
+ final Set<AllocatedSlot> removedSlots = new HashSet<>(allocatedSlots.removeSlotsForTaskManager(resourceId));
+
+ for (AllocatedSlot allocatedSlot : removedSlots) {
+ allocatedSlot.releasePayload(cause);
}
- return CompletableFuture.completedFuture(Acknowledge.get());
+ removedSlots.addAll(availableSlots.removeAllForTaskManager(resourceId));
+
+ for (AllocatedSlot removedSlot : removedSlots) {
+ TaskManagerGateway taskManagerGateway = removedSlot.getTaskManagerGateway();
+ taskManagerGateway.freeSlot(removedSlot.getAllocationId(), cause, timeout);
+ }
+ }
+
+ /**
+ * Clear the internal state of the SlotPool.
+ */
+ private void clear() {
+ availableSlots.clear();
+ allocatedSlots.clear();
+ pendingRequests.clear();
+ waitingForResourceManager.clear();
+ registeredTaskManagers.clear();
+ slotSharingManagers.clear();
}
// ------------------------------------------------------------------------
@@ -1363,8 +1406,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
* Remove all available slots come from specified TaskManager.
*
* @param taskManager The id of the TaskManager
+ * @return The set of removed slots for the given TaskManager
*/
- void removeAllForTaskManager(final ResourceID taskManager) {
+ Set<AllocatedSlot> removeAllForTaskManager(final ResourceID taskManager) {
// remove from the by-TaskManager view
final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
@@ -1381,6 +1425,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS
if (slotsForHost.isEmpty()) {
availableSlotsByHost.remove(host);
}
+
+ return slotsForTm;
+ } else {
+ return Collections.emptySet();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
index 7a627b4..0df6262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
@@ -84,10 +84,10 @@ public interface SlotPoolGateway extends AllocatedSlotActions, RpcGateway {
/**
* Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}.
*
- * @param resourceID identifying the TaskExecutor which shall be released from the SlotPool
+ * @param resourceId identifying the TaskExecutor which shall be released from the SlotPool
* @return Future acknowledge which is completed after the TaskExecutor has been released
*/
- CompletableFuture<Acknowledge> releaseTaskManager(ResourceID resourceID);
+ CompletableFuture<Acknowledge> releaseTaskManager(final ResourceID resourceId);
/**
* Offers a slot to the {@link SlotPool}. The slot offer can be accepted or
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 20dcfa9..77737e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -356,8 +356,7 @@ public class JobLeaderService {
* Retrying registration for the job manager <--> task manager connection.
*/
private static final class JobManagerRetryingRegistration
- extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess>
- {
+ extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
private final String taskManagerRpcAddress;
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a348948..5577472 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -113,34 +113,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public static final String TASK_MANAGER_NAME = "taskmanager";
- /** The connection information of this task manager */
+ /** The connection information of this task manager. */
private final TaskManagerLocation taskManagerLocation;
- /** Max blob port which is accepted */
+ /** Max blob port which is accepted. */
public static final int MAX_BLOB_PORT = 65536;
- /** The access to the leader election and retrieval services */
+ /** The access to the leader election and retrieval services. */
private final HighAvailabilityServices haServices;
- /** The task manager configuration */
+ /** The task manager configuration. */
private final TaskManagerConfiguration taskManagerConfiguration;
- /** The I/O manager component in the task manager */
+ /** The I/O manager component in the task manager. */
private final IOManager ioManager;
- /** The memory manager component in the task manager */
+ /** The memory manager component in the task manager. */
private final MemoryManager memoryManager;
- /** The network component in the task manager */
+ /** The network component in the task manager. */
private final NetworkEnvironment networkEnvironment;
- /** The heartbeat manager for job manager in the task manager */
+ /** The heartbeat manager for job manager in the task manager. */
private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
- /** The heartbeat manager for resource manager in the task manager */
+ /** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, SlotReport> resourceManagerHeartbeatManager;
- /** The fatal error handler to use in case of a fatal error */
+ /** The fatal error handler to use in case of a fatal error. */
private final FatalErrorHandler fatalErrorHandler;
private final TaskManagerMetricGroup taskManagerMetricGroup;
@@ -673,6 +673,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
return CompletableFuture.completedFuture(Acknowledge.get());
}
+ @Override
+ public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+ freeSlotInternal(allocationId, cause);
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
// ----------------------------------------------------------------------
// Disconnection RPCs
// ----------------------------------------------------------------------
@@ -820,7 +827,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
// We encountered an exception. Free the slots and return them to the RM.
for (SlotOffer reservedSlot: reservedSlots) {
- freeSlot(reservedSlot.getAllocationId(), throwable);
+ freeSlotInternal(reservedSlot.getAllocationId(), throwable);
}
}
} else {
@@ -834,7 +841,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
final Exception e = new Exception("The slot was rejected by the JobManager.");
for (SlotOffer rejectedSlot : reservedSlots) {
- freeSlot(rejectedSlot.getAllocationId(), e);
+ freeSlotInternal(rejectedSlot.getAllocationId(), e);
}
} else {
// discard the response since there is a new leader for the job
@@ -898,20 +905,24 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
// 1. fail tasks running under this JobID
Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+ final FlinkException failureCause = new FlinkException("JobManager responsible for " + jobId +
+ " lost the leadership.", cause);
+
while (tasks.hasNext()) {
- tasks.next().failExternally(new Exception("JobManager responsible for " + jobId +
- " lost the leadership."));
+ tasks.next().failExternally(failureCause);
}
// 2. Move the active slots to state allocated (possible to time out again)
Iterator<AllocationID> activeSlots = taskSlotTable.getActiveSlots(jobId);
+ final FlinkException freeingCause = new FlinkException("Slot could not be marked inactive.");
+
while (activeSlots.hasNext()) {
AllocationID activeSlot = activeSlots.next();
try {
if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) {
- freeSlot(activeSlot, new Exception("Slot could not be marked inactive."));
+ freeSlotInternal(activeSlot, freeingCause);
}
} catch (SlotNotFoundException e) {
log.debug("Could not mark the slot {} inactive.", jobId, e);
@@ -1017,8 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private void updateTaskExecutionState(
final JobMasterGateway jobMasterGateway,
- final TaskExecutionState taskExecutionState)
- {
+ final TaskExecutionState taskExecutionState) {
final ExecutionAttemptID executionAttemptID = taskExecutionState.getID();
CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState);
@@ -1065,7 +1075,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
}
- private void freeSlot(AllocationID allocationId, Throwable cause) {
+ private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
Preconditions.checkNotNull(allocationId);
try {
@@ -1085,8 +1095,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
}
- private void freeSlot(AllocationID allocationId) {
- freeSlot(allocationId, new Exception("The slot " + allocationId + " is beeing freed."));
+ private void freeSlotInternal(AllocationID allocationId) {
+ freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " is being freed."));
}
private void timeoutSlot(AllocationID allocationId, UUID ticket) {
@@ -1094,7 +1104,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
Preconditions.checkNotNull(ticket);
if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
- freeSlot(allocationId, new Exception("The slot " + allocationId + " has timed out."));
+ freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed out."));
} else {
log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket);
}
@@ -1196,14 +1206,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) {
log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId);
- runAsync(new Runnable() {
- @Override
- public void run() {
- closeJobManagerConnection(
- jobId,
- new Exception("Job leader for job id " + jobId + " lost leadership."));
- }
- });
+ runAsync(() ->
+ closeJobManagerConnection(
+ jobId,
+ new Exception("Job leader for job id " + jobId + " lost leadership.")));
}
@Override
@@ -1219,12 +1225,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
final ResourceID resourceManagerId = success.getResourceManagerId();
runAsync(
- new Runnable() {
- @Override
- public void run() {
- establishResourceManagerConnection(resourceManagerId);
- }
- }
+ () -> establishResourceManagerConnection(resourceManagerId)
);
}
@@ -1243,12 +1244,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
- }
- });
+ runAsync(() -> unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID));
}
@Override
@@ -1263,12 +1259,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- TaskExecutor.this.failTask(executionAttemptID, cause);
- }
- });
+ runAsync(() -> TaskExecutor.this.failTask(executionAttemptID, cause));
}
@Override
@@ -1281,22 +1272,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void freeSlot(final AllocationID allocationId) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- TaskExecutor.this.freeSlot(allocationId);
- }
- });
+ runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId));
}
@Override
public void timeoutSlot(final AllocationID allocationId, final UUID ticket) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- TaskExecutor.this.timeoutSlot(allocationId, ticket);
- }
- });
+ runAsync(() -> TaskExecutor.this.timeoutSlot(allocationId, ticket));
}
}
@@ -1304,19 +1285,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
-
- if (jobManagerConnections.containsKey(resourceID)) {
- JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
-
- if (jobManagerConnection != null) {
- closeJobManagerConnection(
- jobManagerConnection.getJobID(),
- new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
- }
+ runAsync(() -> {
+ log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
+
+ if (jobManagerConnections.containsKey(resourceID)) {
+ JobManagerConnection jobManagerConnection = jobManagerConnections.get(resourceID);
+
+ if (jobManagerConnection != null) {
+ closeJobManagerConnection(
+ jobManagerConnection.getJobID(),
+ new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
}
}
});
@@ -1337,15 +1315,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
- runAsync(new Runnable() {
- @Override
- public void run() {
- log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
-
- closeResourceManagerConnection(
- new TimeoutException(
- "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
- }
+ runAsync(() -> {
+ log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+
+ closeResourceManagerConnection(
+ new TimeoutException(
+ "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ee0f69d..3dc80b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -162,4 +162,17 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param cause for the disconnection from the ResourceManager
*/
void disconnectResourceManager(Exception cause);
+
+ /**
+ * Frees the slot with the given allocation ID.
+ *
+ * @param allocationId identifying the slot to free
+ * @param cause of the freeing operation
+ * @param timeout for the operation
+ * @return Future acknowledge which is returned once the slot has been freed
+ */
+ CompletableFuture<Acknowledge> freeSlot(
+ final AllocationID allocationId,
+ final Throwable cause,
+ @RpcTimeout final Time timeout);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 6f5230c..411aa94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -38,34 +38,34 @@ import java.util.Map;
* <li>Allocated - The slot has been allocated for a job.</li>
* <li>Active - The slot is in active use by a job manager which is the leader of the allocating job.</li>
* </ul>
- * <p>
- * A task slot can only be allocated if it is in state free. An allocated task slot can transition
+ *
+ * <p>A task slot can only be allocated if it is in state free. An allocated task slot can transition
* to state active.
- *<p>
- * An active slot allows to add tasks from the respective job and with the correct allocation id.
+ *
+ * <p>An active slot allows to add tasks from the respective job and with the correct allocation id.
* An active slot can be marked as inactive which sets the state back to allocated.
- * <p>
- * An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
+ *
+ * <p>An allocated or active slot can only be freed if it is empty. If it is not empty, then it's state
* can be set to releasing indicating that it can be freed once it becomes empty.
*/
public class TaskSlot {
- /** Index of the task slot */
+ /** Index of the task slot. */
private final int index;
- /** Resource characteristics for this slot */
+ /** Resource characteristics for this slot. */
private final ResourceProfile resourceProfile;
- /** Tasks running in this slot */
+ /** Tasks running in this slot. */
private final Map<ExecutionAttemptID, Task> tasks;
- /** State of this slot */
+ /** State of this slot. */
private TaskSlotState state;
- /** Job id to which the slot has been allocated; null if not allocated */
+ /** Job id to which the slot has been allocated; null if not allocated. */
private JobID jobId;
- /** Allocation id of this slot; null if not allocated */
+ /** Allocation id of this slot; null if not allocated. */
private AllocationID allocationId;
TaskSlot(final int index, final ResourceProfile resourceProfile) {
@@ -151,7 +151,7 @@ public class TaskSlot {
* task with the same execution attempt id added to the task slot. In this case, the method
* returns true. Otherwise the task slot is left unchanged and false is returned.
*
- * In case that the task slot state is not active an {@link IllegalStateException} is thrown.
+ * <p>In case that the task slot state is not active an {@link IllegalStateException} is thrown.
* In case that the task's job id and allocation id don't match with the job id and allocation
* id for which the task slot has been allocated, an {@link IllegalArgumentException} is thrown.
*
@@ -199,7 +199,7 @@ public class TaskSlot {
* or is already allocated/active for the given job and allocation id, then the method returns
* true. Otherwise it returns false.
*
- * A slot can only be allocated if it's current state is free.
+ * <p>A slot can only be allocated if it's current state is free.
*
* @param newJobId to allocate the slot for
* @param newAllocationId to identify the slot allocation
@@ -230,7 +230,7 @@ public class TaskSlot {
/**
* Mark this slot as active. A slot can only be marked active if it's in state allocated.
*
- * The method returns true if the slot was set to active. Otherwise it returns false.
+ * <p>The method returns true if the slot was set to active. Otherwise it returns false.
*
* @return True if the new state of the slot is active; otherwise false
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 1384336..62101e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,35 +48,35 @@ import java.util.UUID;
/**
* Container for multiple {@link TaskSlot} instances. Additionally, it maintains multiple indices
* for faster access to tasks and sets of allocated slots.
- * <p>
- * The task slot table automatically registers timeouts for allocated slots which cannot be assigned
+ *
+ * <p>The task slot table automatically registers timeouts for allocated slots which cannot be assigned
* to a job manager.
- * <p>
- * Before the task slot table can be used, it must be started via the {@link #start} method.
+ *
+ * <p>Before the task slot table can be used, it must be started via the {@link #start} method.
*/
public class TaskSlotTable implements TimeoutListener<AllocationID> {
private static final Logger LOG = LoggerFactory.getLogger(TaskSlotTable.class);
- /** Timer service used to time out allocated slots */
+ /** Timer service used to time out allocated slots. */
private final TimerService<AllocationID> timerService;
- /** The list of all task slots */
+ /** The list of all task slots. */
private final List<TaskSlot> taskSlots;
- /** Mapping from allocation id to task slot */
+ /** Mapping from allocation id to task slot. */
private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
- /** Mapping from execution attempt id to task and task slot */
+ /** Mapping from execution attempt id to task and task slot. */
private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
- /** Mapping from job id to allocated slots for a job */
+ /** Mapping from job id to allocated slots for a job. */
private final Map<JobID, Set<AllocationID>> slotsPerJob;
- /** Interface for slot actions, such as freeing them or timing them out */
+ /** Interface for slot actions, such as freeing them or timing them out. */
private SlotActions slotActions;
- /** Whether the table has been started */
+ /** Whether the table has been started. */
private boolean started;
public TaskSlotTable(
@@ -555,7 +556,6 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
private final Task task;
private final TaskSlot taskSlot;
-
private TaskSlotMapping(Task task, TaskSlot taskSlot) {
this.task = Preconditions.checkNotNull(task);
this.taskSlot = Preconditions.checkNotNull(taskSlot);
@@ -675,7 +675,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
}
/**
- * Iterator over all {@link Task} for a given job
+ * Iterator over all {@link Task} for a given job.
*/
private final class TaskIterator implements Iterator<Task> {
private final Iterator<TaskSlot> taskSlotIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index 8ec9a2e..84fb9e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskexecutor.slot;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,16 +40,16 @@ public class TimerService<K> {
private static final Logger LOG = LoggerFactory.getLogger(TimerService.class);
- /** Executor service for the scheduled timeouts */
+ /** Executor service for the scheduled timeouts. */
private final ScheduledExecutorService scheduledExecutorService;
/** Timeout for the shutdown of the service. */
private final long shutdownTimeout;
- /** Map of currently active timeouts */
+ /** Map of currently active timeouts. */
private final Map<K, Timeout<K>> timeouts;
- /** Listener which is notified about occurring timeouts */
+ /** Listener which is notified about occurring timeouts. */
private TimeoutListener<K> timeoutListener;
public TimerService(
@@ -79,7 +80,7 @@ public class TimerService<K> {
scheduledExecutorService.shutdown();
try {
- if(!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
+ if (!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) {
LOG.debug("The scheduled executor service did not properly terminate. Shutting " +
"it down now.");
scheduledExecutorService.shutdownNow();
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 2245a8c..ccaed96 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -599,8 +599,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
WaitForTasks waitForTasks = new WaitForTasks(parallelism);
WaitForTasks waitForTasksCancelled = new WaitForTasks(parallelism);
- taskManagerGateway.setCondition(waitForTasks);
- taskManagerGateway.setCancelCondition(waitForTasksCancelled);
+ taskManagerGateway.setSubmitConsumer(waitForTasks);
+ taskManagerGateway.setCancelConsumer(waitForTasksCancelled);
eg.setScheduleMode(ScheduleMode.EAGER);
eg.scheduleForExecution();
@@ -649,7 +649,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism);
- taskManagerGateway.setCondition(waitForTasksAfterRestart);
+ taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS);
@@ -685,7 +685,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
WaitForTasks waitForTasks = new WaitForTasks(parallelism);
- taskManagerGateway.setCondition(waitForTasks);
+ taskManagerGateway.setSubmitConsumer(waitForTasks);
eg.setScheduleMode(ScheduleMode.EAGER);
eg.scheduleForExecution();
@@ -699,7 +699,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.FAILING, eg.getState());
WaitForTasks waitForTasksRestart = new WaitForTasks(parallelism);
- taskManagerGateway.setCondition(waitForTasksRestart);
+ taskManagerGateway.setSubmitConsumer(waitForTasksRestart);
completeCancellingForAllVertices(eg);
waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
@@ -750,7 +750,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
new JobID(), scheduler, new FixedDelayRestartStrategy(Integer.MAX_VALUE, 0), executor, source, sink);
WaitForTasks waitForTasks = new WaitForTasks(parallelism * 2);
- taskManagerGateway.setCondition(waitForTasks);
+ taskManagerGateway.setSubmitConsumer(waitForTasks);
eg.setScheduleMode(ScheduleMode.EAGER);
eg.scheduleForExecution();
@@ -766,7 +766,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
assertEquals(JobStatus.FAILING, eg.getState());
WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism * 2);
- taskManagerGateway.setCondition(waitForTasksAfterRestart);
+ taskManagerGateway.setSubmitConsumer(waitForTasksAfterRestart);
completeCancellingForAllVertices(eg);
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 682705a..628f004 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -20,9 +20,11 @@ package org.apache.flink.runtime.executiongraph.utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -46,21 +48,27 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
private final String address = UUID.randomUUID().toString();
- private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;
+ private Optional<Consumer<ExecutionAttemptID>> optSubmitConsumer;
- private Optional<Consumer<ExecutionAttemptID>> optCancelCondition;
+ private Optional<Consumer<ExecutionAttemptID>> optCancelConsumer;
+
+ private volatile Consumer<Tuple2<AllocationID, Throwable>> freeSlotConsumer;
public SimpleAckingTaskManagerGateway() {
- optSubmitCondition = Optional.empty();
- optCancelCondition = Optional.empty();
+ optSubmitConsumer = Optional.empty();
+ optCancelConsumer = Optional.empty();
+ }
+
+ public void setSubmitConsumer(Consumer<ExecutionAttemptID> predicate) {
+ optSubmitConsumer = Optional.of(predicate);
}
- public void setCondition(Consumer<ExecutionAttemptID> predicate) {
- optSubmitCondition = Optional.of(predicate);
+ public void setCancelConsumer(Consumer<ExecutionAttemptID> predicate) {
+ optCancelConsumer = Optional.of(predicate);
}
- public void setCancelCondition(Consumer<ExecutionAttemptID> predicate) {
- optCancelCondition = Optional.of(predicate);
+ public void setFreeSlotConsumer(Consumer<Tuple2<AllocationID, Throwable>> consumer) {
+ freeSlotConsumer = consumer;
}
@Override
@@ -92,7 +100,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
@Override
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
- optSubmitCondition.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
+ optSubmitConsumer.ifPresent(condition -> condition.accept(tdd.getExecutionAttemptId()));
return CompletableFuture.completedFuture(Acknowledge.get());
}
@@ -103,7 +111,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
@Override
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
- optCancelCondition.ifPresent(condition -> condition.accept(executionAttemptID));
+ optCancelConsumer.ifPresent(condition -> condition.accept(executionAttemptID));
return CompletableFuture.completedFuture(Acknowledge.get());
}
@@ -139,4 +147,15 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
public CompletableFuture<TransientBlobKey> requestTaskManagerStdout(Time timeout) {
return FutureUtils.completedExceptionally(new UnsupportedOperationException());
}
+
+ @Override
+ public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+ final Consumer<Tuple2<AllocationID, Throwable>> currentFreeSlotConsumer = freeSlotConsumer;
+
+ if (currentFreeSlotConsumer != null) {
+ currentFreeSlotConsumer.accept(Tuple2.of(allocationId, cause));
+ }
+
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
new file mode 100644
index 0000000..939aece
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DummyScheduledUnit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/**
+ * Dummy implementation of {@link ScheduledUnit} for testing purposes.
+ */
+public class DummyScheduledUnit extends ScheduledUnit {
+ public DummyScheduledUnit() {
+ super(
+ null,
+ new JobVertexID(),
+ null,
+ null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 707ea00..6a8ef0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
@@ -44,33 +44,31 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.RETURNS_MOCKS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
public class SlotPoolTest extends TestLogger {
@@ -84,7 +82,7 @@ public class SlotPoolTest extends TestLogger {
private TaskManagerLocation taskManagerLocation;
- private TaskManagerGateway taskManagerGateway;
+ private SimpleAckingTaskManagerGateway taskManagerGateway;
@Before
public void setUp() throws Exception {
@@ -102,7 +100,10 @@ public class SlotPoolTest extends TestLogger {
@Test
public void testAllocateSimpleSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
final SlotPool slotPool = new SlotPool(rpcService, jobId);
try {
@@ -112,17 +113,14 @@ public class SlotPoolTest extends TestLogger {
SlotRequestId requestId = new SlotRequestId();
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
requestId,
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
timeout);
assertFalse(future.isDone());
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+ final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
@@ -136,13 +134,21 @@ public class SlotPoolTest extends TestLogger {
assertTrue(slot.isAlive());
assertEquals(taskManagerLocation, slot.getTaskManagerLocation());
} finally {
- slotPool.shutDown();
+ RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}
@Test
public void testAllocationFulfilledByReturnedSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ final ArrayBlockingQueue<SlotRequest> slotRequestQueue = new ArrayBlockingQueue<>(2);
+
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
+ while (!slotRequestQueue.offer(slotRequest)) {
+ // noop
+ }
+ });
+
final SlotPool slotPool = new SlotPool(rpcService, jobId);
try {
@@ -151,14 +157,14 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
timeout);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
@@ -167,11 +173,11 @@ public class SlotPoolTest extends TestLogger {
assertFalse(future1.isDone());
assertFalse(future2.isDone());
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2))
- .requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+ final List<SlotRequest> slotRequests = new ArrayList<>(2);
- final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues();
+ for (int i = 0; i < 2; i++) {
+ slotRequests.add(slotRequestQueue.poll(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
+ }
final SlotOffer slotOffer = new SlotOffer(
slotRequests.get(0).getAllocationId(),
@@ -198,13 +204,16 @@ public class SlotPoolTest extends TestLogger {
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
assertEquals(slot1.getAllocationId(), slot2.getAllocationId());
} finally {
- slotPool.shutDown();
+ RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}
@Test
public void testAllocateWithFreeSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
final SlotPool slotPool = new SlotPool(rpcService, jobId);
try {
@@ -213,17 +222,14 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
timeout);
assertFalse(future1.isDone());
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+ final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
@@ -240,7 +246,7 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
@@ -262,7 +268,11 @@ public class SlotPoolTest extends TestLogger {
@Test
public void testOfferSlot() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
+
final SlotPool slotPool = new SlotPool(rpcService, jobId);
try {
@@ -271,17 +281,14 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
timeout);
assertFalse(future.isDone());
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+ final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
final SlotOffer slotOffer = new SlotOffer(
slotRequest.getAllocationId(),
@@ -320,7 +327,10 @@ public class SlotPoolTest extends TestLogger {
@Test
public void testReleaseResource() throws Exception {
- ResourceManagerGateway resourceManagerGateway = createResourceManagerGatewayMock();
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ final CompletableFuture<SlotRequest> slotRequestFuture = new CompletableFuture<>();
+
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest -> slotRequestFuture.complete(slotRequest));
final CompletableFuture<Boolean> slotReturnFuture = new CompletableFuture<>();
@@ -347,20 +357,17 @@ public class SlotPoolTest extends TestLogger {
CompletableFuture<LogicalSlot> future1 = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
timeout);
- ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class);
- verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class));
-
- final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue();
+ final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
CompletableFuture<LogicalSlot> future2 = slotPoolGateway.allocateSlot(
new SlotRequestId(),
- mock(ScheduledUnit.class),
+ new DummyScheduledUnit(),
DEFAULT_TESTING_PROFILE,
Collections.emptyList(),
true,
@@ -527,13 +534,58 @@ public class SlotPoolTest extends TestLogger {
}
}
- private static ResourceManagerGateway createResourceManagerGatewayMock() {
- ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
- when(resourceManagerGateway
- .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class)))
- .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
+ /**
+ * Tests that a SlotPool shutdown releases all registered slots
+ */
+ @Test
+ public void testShutdownReleasesAllSlots() throws Exception {
+ final SlotPool slotPool = new SlotPool(rpcService, jobId);
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+
+ try {
+ final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway);
+
+ slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID());
+
+ final int numSlotOffers = 2;
+
+ final Collection<SlotOffer> slotOffers = new ArrayList<>(numSlotOffers);
+
+ for (int i = 0; i < numSlotOffers; i++) {
+ slotOffers.add(
+ new SlotOffer(
+ new AllocationID(),
+ i,
+ ResourceProfile.UNKNOWN));
+ }
+
+ final ArrayBlockingQueue<AllocationID> freedSlotQueue = new ArrayBlockingQueue<>(numSlotOffers);
- return resourceManagerGateway;
+ taskManagerGateway.setFreeSlotConsumer(tuple -> {
+ while(!freedSlotQueue.offer(tuple.f0)) {}
+ });
+
+ final CompletableFuture<Collection<SlotOffer>> acceptedSlotOffersFuture = slotPoolGateway.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers);
+
+ final Collection<SlotOffer> acceptedSlotOffers = acceptedSlotOffersFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));
+
+ // shut down the slot pool
+ slotPool.shutDown();
+ slotPool.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ // the shut down operation should have freed all registered slots
+ ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
+
+ while (freedSlots.size() < numSlotOffers) {
+ freedSlotQueue.drainTo(freedSlots);
+ }
+
+ assertThat(freedSlots, Matchers.containsInAnyOrder(slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+ }
}
private static SlotPoolGateway setupSlotPool(
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 94f325d..c6334c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -113,6 +113,11 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway {
}
@Override
+ public CompletableFuture<Acknowledge> freeSlot(AllocationID allocationId, Throwable cause, Time timeout) {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+
+ @Override
public String getAddress() {
return address;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9541afd2/flink-streaming-scala/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/resources/log4j-test.properties b/flink-streaming-scala/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..7ba1633
--- /dev/null
+++ b/flink-streaming-scala/src/test/resources/log4j-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, console
+
+# -----------------------------------------------------------------------------
+# Console (use 'console')
+# -----------------------------------------------------------------------------
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# -----------------------------------------------------------------------------
+# File (use 'file')
+# -----------------------------------------------------------------------------
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.file=${log.dir}/${mvn.forkNumber}.log
+log4j.appender.file.append=false
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console