You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 16:27:25 UTC
[1/5] flink git commit: [FLINK-7918] Run AbstractTestBase tests on
Flip-6 MiniCluster
Repository: flink
Updated Branches:
refs/heads/master 438e4e374 -> d42759d0a
[FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster
This closes #5095.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98afd1de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98afd1de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98afd1de
Branch: refs/heads/master
Commit: 98afd1de748a6e19db54d2b9ab54f92f0472709c
Parents: 63d4819
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 23 17:02:58 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:14 2018 +0100
----------------------------------------------------------------------
.../connectors/fs/RollingSinkITCase.java | 2 +
.../runtime/minicluster/JobExecutorService.java | 37 ++++++
.../flink/runtime/minicluster/MiniCluster.java | 17 ++-
.../runtime/minicluster/FlinkMiniCluster.scala | 14 ++-
.../flink/test/util/AbstractTestBase.java | 3 +
.../flink/test/util/MiniClusterResource.java | 120 +++++++++++++++----
pom.xml | 3 +
7 files changed, 169 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 78f643f..e1124e4 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -121,6 +121,8 @@ public class RollingSinkITCase extends TestLogger {
new org.apache.flink.configuration.Configuration(),
1,
4));
+
+ miniClusterResource.before();
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
new file mode 100644
index 0000000..03d2447
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface to control {@link JobExecutor}.
+ */
+public interface JobExecutorService extends JobExecutor {
+
+ /**
+ * Terminate the given JobExecutorService.
+ *
+ * <p>This method can be implemented asynchronously. Therefore it returns a future
+ * which is completed once the termination has been done.
+ *
+ * @return Termination future which can also contain an exception if the termination went wrong
+ */
+ CompletableFuture<?> terminate();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2598a60..531c1a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -58,7 +59,7 @@ import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
-public class MiniCluster implements JobExecutor {
+public class MiniCluster implements JobExecutorService {
private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
@@ -458,6 +459,10 @@ public class MiniCluster implements JobExecutor {
dispatcher = this.jobDispatcher;
}
+ // we have to allow queued scheduling in Flip-6 mode because we need to request slots
+ // from the ResourceManager
+ job.setAllowQueuedScheduling(true);
+
return dispatcher.runJobBlocking(job);
}
@@ -593,6 +598,16 @@ public class MiniCluster implements JobExecutor {
return priorException;
}
+ @Override
+ public CompletableFuture<?> terminate() {
+ try {
+ shutdown();
+ return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ return FutureUtils.completedExceptionally(e);
+ }
+ }
+
private class TerminatingFatalErrorHandler implements FatalErrorHandler {
private final int index;
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index cc8ae5f..44e3a67 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.minicluster
import java.net.{URL, URLClassLoader}
import java.util.UUID
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{CompletableFuture, Executors, TimeUnit}
import akka.pattern.Patterns.gracefulStop
import akka.pattern.ask
@@ -65,7 +65,7 @@ abstract class FlinkMiniCluster(
val highAvailabilityServices: HighAvailabilityServices,
val useSingleActorSystem: Boolean)
extends LeaderRetrievalListener
- with JobExecutor {
+ with JobExecutorService {
protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
@@ -713,4 +713,14 @@ abstract class FlinkMiniCluster(
override def executeJobBlocking(jobGraph: JobGraph) = {
submitJobAndWait(jobGraph, false)
}
+
+ override def terminate() = {
+ try {
+ stop()
+ CompletableFuture.completedFuture(null)
+ } catch {
+ case e: Exception =>
+ FutureUtils.completedExceptionally(e)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 65b351d..d73f624 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -19,9 +19,11 @@
package org.apache.flink.test.util;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.testutils.category.OldAndFlip6;
import org.apache.flink.util.FileUtils;
import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import java.io.File;
@@ -54,6 +56,7 @@ import java.io.IOException;
*
* </pre>
*/
+@Category(OldAndFlip6.class)
public abstract class AbstractTestBase extends TestBaseUtils {
private static final int DEFAULT_PARALLELISM = 4;
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 20dbebb..69070c6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -22,15 +22,23 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.JobExecutorService;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
/**
* Starts a Flink mini cluster as a resource and registers the respective
* ExecutionEnvironment and StreamExecutionEnvironment.
@@ -39,16 +47,31 @@ public class MiniClusterResource extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
+ private static final String CODEBASE_KEY = "codebase";
+
+ private static final String FLIP6_CODEBASE = "flip6";
+
private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
- private LocalFlinkMiniCluster localFlinkMiniCluster;
+ private final MiniClusterType miniClusterType;
+
+ private JobExecutorService jobExecutorService;
private int numberSlots = -1;
private TestEnvironment executionEnvironment;
public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+ this(
+ miniClusterResourceConfiguration,
+ Objects.equals(FLIP6_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.FLIP6 : MiniClusterType.OLD);
+ }
+
+ public MiniClusterResource(
+ final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+ final MiniClusterType miniClusterType) {
this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
+ this.miniClusterType = Preconditions.checkNotNull(miniClusterType);
}
public int getNumberSlots() {
@@ -61,37 +84,74 @@ public class MiniClusterResource extends ExternalResource {
@Override
public void before() throws Exception {
- final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
- configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
- configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.numberSlotsPerTaskManager);
-
- localFlinkMiniCluster = TestBaseUtils.startCluster(
- configuration,
- true);
+ jobExecutorService = startJobExecutorService(miniClusterType);
numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
- executionEnvironment = new TestEnvironment(localFlinkMiniCluster, numberSlots, false);
+ executionEnvironment = new TestEnvironment(jobExecutorService, numberSlots, false);
executionEnvironment.setAsContext();
- TestStreamEnvironment.setAsContext(localFlinkMiniCluster, numberSlots);
+ TestStreamEnvironment.setAsContext(jobExecutorService, numberSlots);
}
@Override
public void after() {
- if (localFlinkMiniCluster != null) {
- try {
- TestBaseUtils.stopCluster(
- localFlinkMiniCluster,
- FutureUtils.toFiniteDuration(miniClusterResourceConfiguration.getShutdownTimeout()));
- } catch (Exception e) {
- LOG.warn("Could not properly shut down the Flink mini cluster.", e);
- }
-
- TestStreamEnvironment.unsetAsContext();
- TestEnvironment.unsetAsContext();
- localFlinkMiniCluster = null;
+
+ TestStreamEnvironment.unsetAsContext();
+ TestEnvironment.unsetAsContext();
+
+ final CompletableFuture<?> terminationFuture = jobExecutorService.terminate();
+
+ try {
+ terminationFuture.get(
+ miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ LOG.warn("Could not properly shut down the MiniClusterResource.", e);
}
+
+ jobExecutorService = null;
+ }
+
+ private JobExecutorService startJobExecutorService(MiniClusterType miniClusterType) throws Exception {
+ final JobExecutorService jobExecutorService;
+ switch (miniClusterType) {
+ case OLD:
+ jobExecutorService = startOldMiniCluster();
+ break;
+ case FLIP6:
+ jobExecutorService = startFlip6MiniCluster();
+ break;
+ default:
+ throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
+ }
+
+ return jobExecutorService;
+ }
+
+ private JobExecutorService startOldMiniCluster() throws Exception {
+ final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
+ configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
+ configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+
+ return TestBaseUtils.startCluster(
+ configuration,
+ true);
+ }
+
+ @Nonnull
+ private JobExecutorService startFlip6MiniCluster() throws Exception {
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(miniClusterResourceConfiguration.getConfiguration())
+ .setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
+ .setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
+ .build();
+
+ final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
+
+ miniCluster.start();
+
+ return miniCluster;
}
/**
@@ -144,4 +204,16 @@ public class MiniClusterResource extends ExternalResource {
return shutdownTimeout;
}
}
+
+ // ---------------------------------------------
+ // Enum definitions
+ // ---------------------------------------------
+
+ /**
+ * Type of the mini cluster to start.
+ */
+ public enum MiniClusterType {
+ OLD,
+ FLIP6
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6b68932..ee07414 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@ under the License.
<!-- run all groups except flip6 by default -->
<test.groups></test.groups>
<test.excludedGroups>org.apache.flink.testutils.category.Flip6</test.excludedGroups>
+ <codebase>old</codebase>
<!--
Keeping the MiniKDC version fixed instead of taking hadoop version dependency
to support testing Kafka, ZK etc., modules that does not have Hadoop dependency
@@ -604,6 +605,7 @@ under the License.
<test.groups>org.apache.flink.testutils.category.Flip6, org.apache.flink.testutils.category.OldAndFlip6</test.groups>
<!-- clear the excluded groups list -->
<test.excludedGroups></test.excludedGroups>
+ <codebase>flip6</codebase>
</properties>
</profile>
@@ -1131,6 +1133,7 @@ under the License.
<systemPropertyVariables>
<forkNumber>0${surefire.forkNumber}</forkNumber>
<log4j.configuration>${log4j.configuration}</log4j.configuration>
+ <codebase>${codebase}</codebase>
</systemPropertyVariables>
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC</argLine>
</configuration>
[3/5] flink git commit: [hotfix] [tests] Refactor TypeHintITCase to
extend AbstractTestBase
Posted by tr...@apache.org.
[hotfix] [tests] Refactor TypeHintITCase to extend AbstractTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/244f03f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/244f03f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/244f03f3
Branch: refs/heads/master
Commit: 244f03f363a6eea709cd45f8e9f495f0ac4eca62
Parents: 98afd1d
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:19:29 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100
----------------------------------------------------------------------
.../flink/test/operators/TypeHintITCase.java | 371 +++++++++----------
1 file changed, 170 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/244f03f3/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
index b634005..62b5186 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
@@ -32,227 +32,196 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.test.operators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
/**
* Integration tests for {@link org.apache.flink.api.common.typeinfo.TypeHint}.
*/
-@RunWith(Parameterized.class)
-public class TypeHintITCase extends JavaProgramTestBase {
+public class TypeHintITCase extends AbstractTestBase {
- private static final int NUM_PROGRAMS = 9;
+ @Test
+ public void testIdentityMapWithMissingTypesAndStringTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
- private final int curProgId;
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
+ .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
+ .returns("Tuple3<Integer, Long, String>");
+ List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
- public TypeHintITCase(int curProgId) {
- this.curProgId = curProgId;
+ String expectedResult = "(2,2,Hello)\n" +
+ "(3,2,Hello world)\n" +
+ "(1,1,Hi)\n";
+
+ compareResultAsText(result, expectedResult);
}
- @Override
- protected void testProgram() throws Exception {
- TypeHintProgs.runProgram(curProgId);
+ @Test
+ public void testIdentityMapWithMissingTypesAndTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
+ // all following generics get erased during compilation
+ .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
+ .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+ List<Tuple3<Integer, Long, String>> result = identityMapDs
+ .collect();
+
+ String expectedResult = "(2,2,Hello)\n" +
+ "(3,2,Hello world)\n" +
+ "(1,1,Hi)\n";
+
+ compareResultAsText(result, expectedResult);
}
- @Parameters
- public static Collection<Object[]> getConfigurations() {
+ @Test
+ public void testFlatMapWithClassTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
- Collection<Object[]> parameters = new ArrayList<>(NUM_PROGRAMS);
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> identityMapDs = ds
+ .flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
+ .returns(Integer.class);
+ List<Integer> result = identityMapDs.collect();
- for (int i = 1; i <= NUM_PROGRAMS; i++) {
- parameters.add(new Object[]{i});
- }
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
- return parameters;
+ compareResultAsText(result, expectedResult);
}
- private static class TypeHintProgs {
-
- public static void runProgram(int progId) throws Exception {
- switch(progId) {
- // Test identity map with missing types and string type hint
- case 1: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
- .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
- .returns("Tuple3<Integer, Long, String>");
- List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
-
- String expectedResult = "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test identity map with missing types and type information type hint
- case 2: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
- // all following generics get erased during compilation
- .map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
- .returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
- List<Tuple3<Integer, Long, String>> result = identityMapDs
- .collect();
-
- String expectedResult = "(2,2,Hello)\n" +
- "(3,2,Hello world)\n" +
- "(1,1,Hi)\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test flat map with class type hint
- case 3: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> identityMapDs = ds
- .flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
- .returns(Integer.class);
- List<Integer> result = identityMapDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test join with type information type hint
- case 4: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .join(ds2)
- .where(0)
- .equalTo(0)
- .with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test flat join with type information type hint
- case 5: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .join(ds2)
- .where(0)
- .equalTo(0)
- .with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test unsorted group reduce with type information type hint
- case 6: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test sorted group reduce with type information type hint
- case 7: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .sortGroup(0, Order.ASCENDING)
- .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test combine group with type information type hint
- case 8: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds
- .groupBy(0)
- .combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- // Test cogroup with type information type hint
- case 9: {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
- DataSet<Integer> resultDs = ds1
- .coGroup(ds2)
- .where(0)
- .equalTo(0)
- .with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
- .returns(BasicTypeInfo.INT_TYPE_INFO);
- List<Integer> result = resultDs.collect();
-
- String expectedResult = "2\n" +
- "3\n" +
- "1\n";
-
- compareResultAsText(result, expectedResult);
- break;
- }
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
+ @Test
+ public void testJoinWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds1
+ .join(ds2)
+ .where(0)
+ .equalTo(0)
+ .with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testFlatJoinWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds1
+ .join(ds2)
+ .where(0)
+ .equalTo(0)
+ .with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testUnsortedGroupReduceWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds
+ .groupBy(0)
+ .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds
+ .groupBy(0)
+ .sortGroup(0, Order.ASCENDING)
+ .reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testCombineGroupWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds
+ .groupBy(0)
+ .combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
+ }
+
+ @Test
+ public void testCoGroupWithTypeInformationTypeHint() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Integer> resultDs = ds1
+ .coGroup(ds2)
+ .where(0)
+ .equalTo(0)
+ .with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+ .returns(BasicTypeInfo.INT_TYPE_INFO);
+ List<Integer> result = resultDs.collect();
+
+ String expectedResult = "2\n" +
+ "3\n" +
+ "1\n";
+
+ compareResultAsText(result, expectedResult);
}
// --------------------------------------------------------------------------------------------
[2/5] flink git commit: [FLINK-8393] [flip6] Reconnect to last known
JobMaster when connection is lost
Posted by tr...@apache.org.
[FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost
In case of a heartbeat timeout or a disconnect call, the TaskExecutor tries to
reconnect to the last known JobMaster location.
This closes #5267.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63d4819e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63d4819e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63d4819e
Branch: refs/heads/master
Commit: 63d4819e197b1df1651157fd8f86c8ca0540d0b1
Parents: 438e4e3
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 20:37:08 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:14 2018 +0100
----------------------------------------------------------------------
.../registration/RegisteredRpcConnection.java | 101 ++++++++++++++-----
.../runtime/taskexecutor/JobLeaderService.java | 45 ++++++++-
.../runtime/taskexecutor/TaskExecutor.java | 27 ++++-
.../taskexecutor/slot/TaskSlotTable.java | 13 ++-
.../RegisteredRpcConnectionTest.java | 79 ++++++++++++---
5 files changed, 219 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index c76bcf8..7d2c35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -46,6 +47,11 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public abstract class RegisteredRpcConnection<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> {
+ private static final AtomicReferenceFieldUpdater<RegisteredRpcConnection, RetryingRegistration> REGISTRATION_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ RegisteredRpcConnection.class,
+ RetryingRegistration.class,
+ "pendingRegistration");
+
/** The logger for all log messages of this class. */
protected final Logger log;
@@ -59,7 +65,7 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
private final Executor executor;
/** The Registration of this RPC connection. */
- private RetryingRegistration<F, G, S> pendingRegistration;
+ private volatile RetryingRegistration<F, G, S> pendingRegistration;
/** The gateway to register, it's null until the registration is completed. */
private volatile G targetGateway;
@@ -85,27 +91,47 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
- pendingRegistration = checkNotNull(generateRegistration());
- pendingRegistration.startRegistration();
+ final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
- CompletableFuture<Tuple2<G, S>> future = pendingRegistration.getFuture();
+ if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
+ newRegistration.startRegistration();
+ } else {
+ // concurrent start operation
+ newRegistration.cancel();
+ }
+ }
- future.whenCompleteAsync(
- (Tuple2<G, S> result, Throwable failure) -> {
- if (failure != null) {
- if (failure instanceof CancellationException) {
- // we ignore cancellation exceptions because they originate from cancelling
- // the RetryingRegistration
- log.debug("Retrying registration towards {} was cancelled.", targetAddress);
- } else {
- // this future should only ever fail if there is a bug, not if the registration is declined
- onRegistrationFailure(failure);
- }
- } else {
- targetGateway = result.f0;
- onRegistrationSuccess(result.f1);
- }
- }, executor);
+ public boolean tryReconnect() {
+ checkState(isConnected(), "Cannot reconnect to an unknown destination.");
+
+ if (closed) {
+ return false;
+ } else {
+ final RetryingRegistration<F, G, S> currentPendingRegistration = pendingRegistration;
+
+ if (currentPendingRegistration != null) {
+ currentPendingRegistration.cancel();
+ }
+
+ final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
+
+ if (REGISTRATION_UPDATER.compareAndSet(this, currentPendingRegistration, newRegistration)) {
+ newRegistration.startRegistration();
+ } else {
+ // concurrent modification
+ newRegistration.cancel();
+ return false;
+ }
+
+ // double check for concurrent close operations
+ if (closed) {
+ newRegistration.cancel();
+
+ return false;
+ } else {
+ return true;
+ }
+ }
}
/**
@@ -175,13 +201,42 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
}
if (isClosed()) {
- connectionInfo = connectionInfo + " is closed";
+ connectionInfo += " is closed";
} else if (isConnected()){
- connectionInfo = connectionInfo + " is established";
+ connectionInfo += " is established";
} else {
- connectionInfo = connectionInfo + " is connecting";
+ connectionInfo += " is connecting";
}
return connectionInfo;
}
+
+ // ------------------------------------------------------------------------
+ // Internal methods
+ // ------------------------------------------------------------------------
+
+ private RetryingRegistration<F, G, S> createNewRegistration() {
+ RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
+
+ CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();
+
+ future.whenCompleteAsync(
+ (Tuple2<G, S> result, Throwable failure) -> {
+ if (failure != null) {
+ if (failure instanceof CancellationException) {
+ // we ignore cancellation exceptions because they originate from cancelling
+ // the RetryingRegistration
+ log.debug("Retrying registration towards {} was cancelled.", targetAddress);
+ } else {
+ // this future should only ever fail if there is a bug, not if the registration is declined
+ onRegistrationFailure(failure);
+ }
+ } else {
+ targetGateway = result.f0;
+ onRegistrationSuccess(result.f1);
+ }
+ }, executor);
+
+ return newRegistration;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 77737e1..3b4da4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -205,6 +205,23 @@ public class JobLeaderService {
}
/**
+ * Triggers reconnection to the last known leader of the given job.
+ *
+ * @param jobId specifying the job for which to trigger reconnection
+ */
+ public void reconnect(final JobID jobId) {
+ Preconditions.checkNotNull(jobId, "JobID must not be null.");
+
+ final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> jobLeaderService = jobLeaderServices.get(jobId);
+
+ if (jobLeaderService != null) {
+ jobLeaderService.f1.reconnect();
+ } else {
+ LOG.info("Cannot reconnect to job {} because it is not registered.", jobId);
+ }
+ }
+
+ /**
* Leader listener which tries to establish a connection to a newly detected job leader.
*/
private final class JobManagerLeaderListener implements LeaderRetrievalListener {
@@ -213,7 +230,7 @@ public class JobLeaderService {
private final JobID jobId;
/** Rpc connection to the job leader. */
- private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+ private volatile RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
/** State of the listener. */
private volatile boolean stopped;
@@ -237,6 +254,32 @@ public class JobLeaderService {
}
}
+ public void reconnect() {
+ if (stopped) {
+ LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
+ } else {
+ final RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> currentRpcConnection = rpcConnection;
+
+ if (currentRpcConnection != null) {
+ if (currentRpcConnection.isConnected()) {
+
+ if (currentRpcConnection.tryReconnect()) {
+ // double check for concurrent stop operation
+ if (stopped) {
+ currentRpcConnection.close();
+ }
+ } else {
+ LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress());
+ }
+ } else {
+ LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress());
+ }
+ } else {
+ LOG.debug("Cannot reconnect to an unknown JobMaster.");
+ }
+ }
+ }
+
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
if (stopped) {
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5577472..3c7d1cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -687,6 +687,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
closeJobManagerConnection(jobId, cause);
+ jobLeaderService.reconnect(jobId);
}
@Override
@@ -1079,16 +1080,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
Preconditions.checkNotNull(allocationId);
try {
- int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause);
+ TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause);
- if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
+ if (taskSlot != null && isConnectedToResourceManager()) {
// the slot was freed. Tell the RM about it
ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.notifySlotAvailable(
resourceManagerConnection.getRegistrationId(),
- new SlotID(getResourceID(), freedSlotIndex),
+ new SlotID(getResourceID(), taskSlot.getIndex()),
allocationId);
+
+ // check whether we still have allocated slots for the same job
+ final JobID jobId = taskSlot.getJobId();
+ final Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+
+ if (!tasks.hasNext()) {
+ // we can remove the job from the job leader service
+ try {
+ jobLeaderService.removeJob(jobId);
+ } catch (Exception e) {
+ log.info("Could not remove job {} from JobLeaderService.", jobId, e);
+ }
+
+ closeJobManagerConnection(
+ jobId,
+ new FlinkException("TaskExecutor " + getAddress() +
+ " has no more allocated slots for job " + jobId + '.'));
+ }
}
} catch (SlotNotFoundException e) {
log.debug("Could not free slot for allocation id {}.", allocationId, e);
@@ -1295,6 +1314,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
+
+ jobLeaderService.reconnect(jobManagerConnection.getJobID());
}
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 62101e7..ab62a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -266,7 +268,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @throws SlotNotFoundException if there is not task slot for the given allocation id
* @return Index of the freed slot if the slot could be freed; otherwise -1
*/
- public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+ public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException {
return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
}
@@ -278,9 +280,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @param allocationId identifying the task slot to be freed
* @param cause to fail the tasks with if slot is not empty
* @throws SlotNotFoundException if there is not task slot for the given allocation id
- * @return Index of the freed slot if the slot could be freed; otherwise -1
+ * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
*/
- public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+ @Nullable
+ public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();
TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -314,7 +317,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
slotsPerJob.remove(jobId);
}
- return taskSlot.getIndex();
+ return taskSlot;
} else {
// we couldn't free the task slot because it still contains task, fail the tasks
// and set the slot state to releasing so that it gets eventually freed
@@ -326,7 +329,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
taskIterator.next().failExternally(cause);
}
- return -1;
+ return null;
}
} else {
throw new SlotNotFoundException(allocationId);
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 19a5756..650a0f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -27,12 +27,15 @@ import org.junit.Test;
import org.slf4j.LoggerFactory;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
@@ -60,14 +63,14 @@ public class RegisteredRpcConnectionTest extends TestLogger {
connection.start();
//wait for connection established
- Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+ final String actualConnectionId = connection.getConnectionFuture().get();
// validate correct invocation and result
assertTrue(connection.isConnected());
assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
assertEquals(leaderId, connection.getTargetLeaderId());
assertEquals(testGateway, connection.getTargetGateway());
- assertEquals(connectionID, connection.getConnectionId());
+ assertEquals(connectionID, actualConnectionId);
}
finally {
testGateway.stop();
@@ -86,8 +89,9 @@ public class RegisteredRpcConnectionTest extends TestLogger {
try {
// gateway that upon calls Throw an exception
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+ final RuntimeException registrationException = new RuntimeException(connectionFailureMessage);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow(
- new RuntimeException(connectionFailureMessage));
+ registrationException);
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
@@ -95,14 +99,18 @@ public class RegisteredRpcConnectionTest extends TestLogger {
connection.start();
//wait for connection failure
- Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+ try {
+ connection.getConnectionFuture().get();
+ fail("expected failure.");
+ } catch (ExecutionException ee) {
+ assertEquals(registrationException, ee.getCause());
+ }
// validate correct invocation and result
assertFalse(connection.isConnected());
assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
assertEquals(leaderId, connection.getTargetLeaderId());
assertNull(connection.getTargetGateway());
- assertEquals(connectionFailureMessage, connection.getFailareMessage());
}
finally {
rpcService.stopService();
@@ -137,21 +145,53 @@ public class RegisteredRpcConnectionTest extends TestLogger {
}
}
+ @Test
+ public void testReconnect() throws Exception {
+ final String connectionId1 = "Test RPC Connection ID 1";
+ final String connectionId2 = "Test RPC Connection ID 2";
+ final TestingRpcService rpcService = new TestingRpcService();
+ final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+ final UUID leaderId = UUID.randomUUID();
+ final TestRegistrationGateway testGateway = new TestRegistrationGateway(
+ new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1),
+ new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2));
+
+ try {
+ rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+ TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+ connection.start();
+
+ final String actualConnectionId1 = connection.getConnectionFuture().get();
+
+ assertEquals(actualConnectionId1, connectionId1);
+
+ assertTrue(connection.tryReconnect());
+
+ final String actualConnectionId2 = connection.getConnectionFuture().get();
+
+ assertEquals(actualConnectionId2, connectionId2);
+ } finally {
+ rpcService.stopService();
+ }
+ }
+
// ------------------------------------------------------------------------
// test RegisteredRpcConnection
// ------------------------------------------------------------------------
private static class TestRpcConnection extends RegisteredRpcConnection<UUID, TestRegistrationGateway, TestRegistrationSuccess> {
- private final RpcService rpcService;
+ private final Object lock = new Object();
- private String connectionId;
+ private final RpcService rpcService;
- private String failureMessage;
+ private CompletableFuture<String> connectionFuture;
public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor, RpcService rpcService) {
super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
this.rpcService = rpcService;
+ this.connectionFuture = new CompletableFuture<>();
}
@Override
@@ -161,20 +201,31 @@ public class RegisteredRpcConnectionTest extends TestLogger {
@Override
protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) {
- connectionId = success.getCorrelationId();
+ synchronized (lock) {
+ connectionFuture.complete(success.getCorrelationId());
+ }
}
@Override
protected void onRegistrationFailure(Throwable failure) {
- failureMessage = failure.getMessage();
+ synchronized (lock) {
+ connectionFuture.completeExceptionally(failure);
+ }
}
- public String getConnectionId() {
- return connectionId;
+ @Override
+ public boolean tryReconnect() {
+ synchronized (lock) {
+ connectionFuture.cancel(false);
+ connectionFuture = new CompletableFuture<>();
+ }
+ return super.tryReconnect();
}
- public String getFailareMessage() {
- return failureMessage;
+ public CompletableFuture<String> getConnectionFuture() {
+ synchronized (lock) {
+ return connectionFuture;
+ }
}
}
}
[5/5] flink git commit: [hotfix] [tests] Fix PageRankITCase,
AggregatorsITCase and DataSinkITCase to use fresh result path
Posted by tr...@apache.org.
[hotfix] [tests] Fix PageRankITCase, AggregatorsITCase and DataSinkITCase to use fresh result path
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d42759d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d42759d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d42759d0
Branch: refs/heads/master
Commit: d42759d0a17d17adc5af9c26b939d431acba5b08
Parents: a6ee040
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:20:35 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/core/fs/FileSystem.java | 2 +-
.../org/apache/flink/test/example/java/PageRankITCase.java | 6 +++++-
.../flink/test/iterative/aggregators/AggregatorsITCase.java | 8 +++++---
.../java/org/apache/flink/test/operators/DataSinkITCase.java | 6 +++++-
4 files changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 07a1e76..8698595 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -752,7 +752,7 @@ public abstract class FileSystem {
return true;
} else {
// file may not be overwritten
- throw new IOException("File or directory already exists. Existing files and directories " +
+ throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
index ee7bf82..2fae57d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
@@ -33,6 +33,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.util.UUID;
/**
* Test for {@link PageRank}.
@@ -54,7 +55,10 @@ public class PageRankITCase extends MultipleProgramsTestBase {
@Before
public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
+ final File folder = tempFolder.newFolder();
+ final File resultFile = new File(folder, UUID.randomUUID().toString());
+ resultPath = resultFile.toURI().toString();
+
File verticesFile = tempFolder.newFile();
FileUtils.writeFileUtf8(verticesFile, PageRankData.VERTICES);
http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 64ee98a..bd42ac2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -49,6 +49,7 @@ import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.Random;
+import java.util.UUID;
import static org.junit.Assert.assertEquals;
@@ -78,9 +79,10 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
@Before
public void before() throws Exception{
- File tempFile = tempFolder.newFile();
- testPath = tempFile.toString();
- resultPath = tempFile.toURI().toString();
+ final File folder = tempFolder.newFolder();
+ final File resultFile = new File(folder, UUID.randomUUID().toString());
+ testPath = resultFile.toString();
+ resultPath = resultFile.toURI().toString();
}
@After
http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
index deb5170..b35e818 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
@@ -35,7 +35,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.BufferedReader;
+import java.io.File;
import java.util.Random;
+import java.util.UUID;
import static org.junit.Assert.assertTrue;
@@ -57,7 +59,9 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
@Before
public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
+ final File folder = tempFolder.newFolder();
+ final File resultFile = new File(folder, UUID.randomUUID().toString());
+ resultPath = resultFile.toURI().toString();
}
@Test
[4/5] flink git commit: [hotfix] [tests] Fix JavaProgramTestBase to
reset MiniClusterResource#TestEnvironment
Posted by tr...@apache.org.
[hotfix] [tests] Fix JavaProgramTestBase to reset MiniClusterResource#TestEnvironment
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6ee040c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6ee040c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6ee040c
Branch: refs/heads/master
Commit: a6ee040c5dbf8b93e94dcbf353e0db897ff8fb29
Parents: 244f03f
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:20:04 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/test/util/JavaProgramTestBase.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6ee040c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index de536d8..9262ae6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -210,7 +210,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
e.printStackTrace();
Assert.fail("Error while calling the test program: " + e.getMessage());
} finally {
- CollectionTestEnvironment.unsetAsContext();
+ miniClusterResource.getTestEnvironment().setAsContext();
}
Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);