You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 16:27:25 UTC

[1/5] flink git commit: [FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster

Repository: flink
Updated Branches:
  refs/heads/master 438e4e374 -> d42759d0a


[FLINK-7918] Run AbstractTestBase tests on Flip-6 MiniCluster

This closes #5095.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98afd1de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98afd1de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98afd1de

Branch: refs/heads/master
Commit: 98afd1de748a6e19db54d2b9ab54f92f0472709c
Parents: 63d4819
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 23 17:02:58 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:14 2018 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkITCase.java        |   2 +
 .../runtime/minicluster/JobExecutorService.java |  37 ++++++
 .../flink/runtime/minicluster/MiniCluster.java  |  17 ++-
 .../runtime/minicluster/FlinkMiniCluster.scala  |  14 ++-
 .../flink/test/util/AbstractTestBase.java       |   3 +
 .../flink/test/util/MiniClusterResource.java    | 120 +++++++++++++++----
 pom.xml                                         |   3 +
 7 files changed, 169 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 78f643f..e1124e4 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -121,6 +121,8 @@ public class RollingSinkITCase extends TestLogger {
 				new org.apache.flink.configuration.Configuration(),
 				1,
 				4));
+
+		miniClusterResource.before();
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
new file mode 100644
index 0000000..03d2447
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/JobExecutorService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface to control {@link JobExecutor}.
+ */
+public interface JobExecutorService extends JobExecutor {
+
+	/**
+	 * Terminate the given JobExecutorService.
+	 *
+	 * <p>This method can be implemented asynchronously. Therefore it returns a future
+	 * which is completed once the termination has been done.
+	 *
+	 * @return Termination future which can also contain an exception if the termination went wrong
+	 */
+	CompletableFuture<?> terminate();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2598a60..531c1a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
@@ -58,7 +59,7 @@ import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-public class MiniCluster implements JobExecutor {
+public class MiniCluster implements JobExecutorService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
 
@@ -458,6 +459,10 @@ public class MiniCluster implements JobExecutor {
 			dispatcher = this.jobDispatcher;
 		}
 
+		// we have to allow queued scheduling in Flip-6 mode because we need to request slots
+		// from the ResourceManager
+		job.setAllowQueuedScheduling(true);
+
 		return dispatcher.runJobBlocking(job);
 	}
 
@@ -593,6 +598,16 @@ public class MiniCluster implements JobExecutor {
 		return priorException;
 	}
 
+	@Override
+	public CompletableFuture<?> terminate() {
+		try {
+			shutdown();
+			return CompletableFuture.completedFuture(null);
+		} catch (Exception e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+
 	private class TerminatingFatalErrorHandler implements FatalErrorHandler {
 
 		private final int index;

http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index cc8ae5f..44e3a67 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.minicluster
 
 import java.net.{URL, URLClassLoader}
 import java.util.UUID
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{CompletableFuture, Executors, TimeUnit}
 
 import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
@@ -65,7 +65,7 @@ abstract class FlinkMiniCluster(
     val highAvailabilityServices: HighAvailabilityServices,
     val useSingleActorSystem: Boolean)
   extends LeaderRetrievalListener
-  with JobExecutor {
+  with JobExecutorService {
 
   protected val LOG = LoggerFactory.getLogger(classOf[FlinkMiniCluster])
 
@@ -713,4 +713,14 @@ abstract class FlinkMiniCluster(
   override def executeJobBlocking(jobGraph: JobGraph) = {
     submitJobAndWait(jobGraph, false)
   }
+
+  override def terminate() = {
+    try {
+      stop()
+      CompletableFuture.completedFuture(null)
+    } catch {
+      case e: Exception =>
+        FutureUtils.completedExceptionally(e)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 65b351d..d73f624 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -19,9 +19,11 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.testutils.category.OldAndFlip6;
 import org.apache.flink.util.FileUtils;
 
 import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
@@ -54,6 +56,7 @@ import java.io.IOException;
  *
  * </pre>
  */
+@Category(OldAndFlip6.class)
 public abstract class AbstractTestBase extends TestBaseUtils {
 
 	private static final int DEFAULT_PARALLELISM = 4;

http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 20dbebb..69070c6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -22,15 +22,23 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.JobExecutorService;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
@@ -39,16 +47,31 @@ public class MiniClusterResource extends ExternalResource {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
 
+	private static final String CODEBASE_KEY = "codebase";
+
+	private static final String FLIP6_CODEBASE = "flip6";
+
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 
-	private LocalFlinkMiniCluster localFlinkMiniCluster;
+	private final MiniClusterType miniClusterType;
+
+	private JobExecutorService jobExecutorService;
 
 	private int numberSlots = -1;
 
 	private TestEnvironment executionEnvironment;
 
 	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
+		this(
+			miniClusterResourceConfiguration,
+			Objects.equals(FLIP6_CODEBASE, System.getProperty(CODEBASE_KEY)) ? MiniClusterType.FLIP6 : MiniClusterType.OLD);
+	}
+
+	public MiniClusterResource(
+			final MiniClusterResourceConfiguration miniClusterResourceConfiguration,
+			final MiniClusterType miniClusterType) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
+		this.miniClusterType = Preconditions.checkNotNull(miniClusterType);
 	}
 
 	public int getNumberSlots() {
@@ -61,37 +84,74 @@ public class MiniClusterResource extends ExternalResource {
 
 	@Override
 	public void before() throws Exception {
-		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
 
-		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.numberSlotsPerTaskManager);
-
-		localFlinkMiniCluster = TestBaseUtils.startCluster(
-			configuration,
-			true);
+		jobExecutorService = startJobExecutorService(miniClusterType);
 
 		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
 
-		executionEnvironment = new TestEnvironment(localFlinkMiniCluster, numberSlots, false);
+		executionEnvironment = new TestEnvironment(jobExecutorService, numberSlots, false);
 		executionEnvironment.setAsContext();
-		TestStreamEnvironment.setAsContext(localFlinkMiniCluster, numberSlots);
+		TestStreamEnvironment.setAsContext(jobExecutorService, numberSlots);
 	}
 
 	@Override
 	public void after() {
-		if (localFlinkMiniCluster != null) {
-			try {
-				TestBaseUtils.stopCluster(
-					localFlinkMiniCluster,
-					FutureUtils.toFiniteDuration(miniClusterResourceConfiguration.getShutdownTimeout()));
-			} catch (Exception e) {
-				LOG.warn("Could not properly shut down the Flink mini cluster.", e);
-			}
-
-			TestStreamEnvironment.unsetAsContext();
-			TestEnvironment.unsetAsContext();
-			localFlinkMiniCluster = null;
+
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+
+		final CompletableFuture<?> terminationFuture = jobExecutorService.terminate();
+
+		try {
+			terminationFuture.get(
+				miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
+				TimeUnit.MILLISECONDS);
+		} catch (Exception e) {
+			LOG.warn("Could not properly shut down the MiniClusterResource.", e);
 		}
+
+		jobExecutorService = null;
+	}
+
+	private JobExecutorService startJobExecutorService(MiniClusterType miniClusterType) throws Exception {
+		final JobExecutorService jobExecutorService;
+		switch (miniClusterType) {
+			case OLD:
+				jobExecutorService = startOldMiniCluster();
+				break;
+			case FLIP6:
+				jobExecutorService = startFlip6MiniCluster();
+				break;
+			default:
+				throw new FlinkRuntimeException("Unknown MiniClusterType "  + miniClusterType + '.');
+		}
+
+		return jobExecutorService;
+	}
+
+	private JobExecutorService startOldMiniCluster() throws Exception {
+		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
+		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
+
+		return TestBaseUtils.startCluster(
+			configuration,
+			true);
+	}
+
+	@Nonnull
+	private JobExecutorService startFlip6MiniCluster() throws Exception {
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(miniClusterResourceConfiguration.getConfiguration())
+			.setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
+			.setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
+			.build();
+
+		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
+
+		miniCluster.start();
+
+		return miniCluster;
 	}
 
 	/**
@@ -144,4 +204,16 @@ public class MiniClusterResource extends ExternalResource {
 			return shutdownTimeout;
 		}
 	}
+
+	// ---------------------------------------------
+	// Enum definitions
+	// ---------------------------------------------
+
+	/**
+	 * Type of the mini cluster to start.
+	 */
+	public enum MiniClusterType {
+		OLD,
+		FLIP6
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98afd1de/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6b68932..ee07414 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@ under the License.
 		<!-- run all groups except flip6 by default -->
 		<test.groups></test.groups>
 		<test.excludedGroups>org.apache.flink.testutils.category.Flip6</test.excludedGroups>
+		<codebase>old</codebase>
 		<!--
 			Keeping the MiniKDC version fixed instead of taking hadoop version dependency
 			to support testing Kafka, ZK etc., modules that does not have Hadoop dependency
@@ -604,6 +605,7 @@ under the License.
 				<test.groups>org.apache.flink.testutils.category.Flip6, org.apache.flink.testutils.category.OldAndFlip6</test.groups>
 				<!-- clear the excluded groups list -->
 				<test.excludedGroups></test.excludedGroups>
+				<codebase>flip6</codebase>
 			</properties>
 		</profile>
 
@@ -1131,6 +1133,7 @@ under the License.
 					<systemPropertyVariables>
 						<forkNumber>0${surefire.forkNumber}</forkNumber>
 						<log4j.configuration>${log4j.configuration}</log4j.configuration>
+						<codebase>${codebase}</codebase>
 					</systemPropertyVariables>
 					<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC</argLine>
 				</configuration>


[3/5] flink git commit: [hotfix] [tests] Refactor TypeHintITCase to extend AbstractTestBase

Posted by tr...@apache.org.
[hotfix] [tests] Refactor TypeHintITCase to extend AbstractTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/244f03f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/244f03f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/244f03f3

Branch: refs/heads/master
Commit: 244f03f363a6eea709cd45f8e9f495f0ac4eca62
Parents: 98afd1d
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:19:29 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100

----------------------------------------------------------------------
 .../flink/test/operators/TypeHintITCase.java    | 371 +++++++++----------
 1 file changed, 170 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/244f03f3/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
index b634005..62b5186 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/TypeHintITCase.java
@@ -32,227 +32,196 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.test.operators.util.CollectionDataSets;
-import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.Collector;
 
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 /**
  * Integration tests for {@link org.apache.flink.api.common.typeinfo.TypeHint}.
  */
-@RunWith(Parameterized.class)
-public class TypeHintITCase extends JavaProgramTestBase {
+public class TypeHintITCase extends AbstractTestBase {
 
-	private static final int NUM_PROGRAMS = 9;
+	@Test
+	public void testIdentityMapWithMissingTypesAndStringTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
 
-	private final int curProgId;
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
+			.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
+			.returns("Tuple3<Integer, Long, String>");
+		List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
 
-	public TypeHintITCase(int curProgId) {
-		this.curProgId = curProgId;
+		String expectedResult = "(2,2,Hello)\n" +
+			"(3,2,Hello world)\n" +
+			"(1,1,Hi)\n";
+
+		compareResultAsText(result, expectedResult);
 	}
 
-	@Override
-	protected void testProgram() throws Exception {
-		TypeHintProgs.runProgram(curProgId);
+	@Test
+	public void testIdentityMapWithMissingTypesAndTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
+			// all following generics get erased during compilation
+			.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
+			.returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+		List<Tuple3<Integer, Long, String>> result = identityMapDs
+			.collect();
+
+		String expectedResult = "(2,2,Hello)\n" +
+			"(3,2,Hello world)\n" +
+			"(1,1,Hi)\n";
+
+		compareResultAsText(result, expectedResult);
 	}
 
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
+	@Test
+	public void testFlatMapWithClassTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
 
-		Collection<Object[]> parameters = new ArrayList<>(NUM_PROGRAMS);
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> identityMapDs = ds
+			.flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
+			.returns(Integer.class);
+		List<Integer> result = identityMapDs.collect();
 
-		for (int i = 1; i <= NUM_PROGRAMS; i++) {
-			parameters.add(new Object[]{i});
-		}
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
 
-		return parameters;
+		compareResultAsText(result, expectedResult);
 	}
 
-	private static class TypeHintProgs {
-
-		public static void runProgram(int progId) throws Exception {
-			switch(progId) {
-			// Test identity map with missing types and string type hint
-			case 1: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
-						.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
-						.returns("Tuple3<Integer, Long, String>");
-				List<Tuple3<Integer, Long, String>> result = identityMapDs.collect();
-
-				String expectedResult = "(2,2,Hello)\n" +
-						"(3,2,Hello world)\n" +
-						"(1,1,Hi)\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test identity map with missing types and type information type hint
-			case 2: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> identityMapDs = ds
-						// all following generics get erased during compilation
-						.map(new Mapper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>())
-						.returns(new TupleTypeInfo<Tuple3<Integer, Long, String>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
-				List<Tuple3<Integer, Long, String>> result = identityMapDs
-						.collect();
-
-				String expectedResult = "(2,2,Hello)\n" +
-						"(3,2,Hello world)\n" +
-						"(1,1,Hi)\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test flat map with class type hint
-			case 3: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> identityMapDs = ds
-						.flatMap(new FlatMapper<Tuple3<Integer, Long, String>, Integer>())
-						.returns(Integer.class);
-				List<Integer> result = identityMapDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test join with type information type hint
-			case 4: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> resultDs = ds1
-						.join(ds2)
-						.where(0)
-						.equalTo(0)
-						.with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
-						.returns(BasicTypeInfo.INT_TYPE_INFO);
-				List<Integer> result = resultDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test flat join with type information type hint
-			case 5: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> resultDs = ds1
-						.join(ds2)
-						.where(0)
-						.equalTo(0)
-						.with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
-						.returns(BasicTypeInfo.INT_TYPE_INFO);
-				List<Integer> result = resultDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test unsorted group reduce with type information type hint
-			case 6: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> resultDs = ds
-						.groupBy(0)
-						.reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
-						.returns(BasicTypeInfo.INT_TYPE_INFO);
-				List<Integer> result = resultDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test sorted group reduce with type information type hint
-			case 7: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> resultDs = ds
-						.groupBy(0)
-						.sortGroup(0, Order.ASCENDING)
-						.reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
-						.returns(BasicTypeInfo.INT_TYPE_INFO);
-				List<Integer> result = resultDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test combine group with type information type hint
-			case 8: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> resultDs = ds
-						.groupBy(0)
-						.combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
-						.returns(BasicTypeInfo.INT_TYPE_INFO);
-				List<Integer> result = resultDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			// Test cogroup with type information type hint
-			case 9: {
-				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
-				DataSet<Integer> resultDs = ds1
-						.coGroup(ds2)
-						.where(0)
-						.equalTo(0)
-						.with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
-						.returns(BasicTypeInfo.INT_TYPE_INFO);
-				List<Integer> result = resultDs.collect();
-
-				String expectedResult = "2\n" +
-						"3\n" +
-						"1\n";
-
-				compareResultAsText(result, expectedResult);
-				break;
-			}
-			default:
-				throw new IllegalArgumentException("Invalid program id");
-			}
-		}
+	@Test
+	public void testJoinWithTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> resultDs = ds1
+			.join(ds2)
+			.where(0)
+			.equalTo(0)
+			.with(new Joiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+			.returns(BasicTypeInfo.INT_TYPE_INFO);
+		List<Integer> result = resultDs.collect();
+
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
+
+		compareResultAsText(result, expectedResult);
+	}
+
+	@Test
+	public void testFlatJoinWithTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> resultDs = ds1
+			.join(ds2)
+			.where(0)
+			.equalTo(0)
+			.with(new FlatJoiner<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+			.returns(BasicTypeInfo.INT_TYPE_INFO);
+		List<Integer> result = resultDs.collect();
+
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
+
+		compareResultAsText(result, expectedResult);
+	}
+
+	@Test
+	public void testUnsortedGroupReduceWithTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> resultDs = ds
+			.groupBy(0)
+			.reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+			.returns(BasicTypeInfo.INT_TYPE_INFO);
+		List<Integer> result = resultDs.collect();
+
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
+
+		compareResultAsText(result, expectedResult);
+	}
+
+	@Test
+	public void testSortedGroupReduceWithTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> resultDs = ds
+			.groupBy(0)
+			.sortGroup(0, Order.ASCENDING)
+			.reduceGroup(new GroupReducer<Tuple3<Integer, Long, String>, Integer>())
+			.returns(BasicTypeInfo.INT_TYPE_INFO);
+		List<Integer> result = resultDs.collect();
+
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
+
+		compareResultAsText(result, expectedResult);
+	}
+
+	@Test
+	public void testCombineGroupWithTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> resultDs = ds
+			.groupBy(0)
+			.combineGroup(new GroupCombiner<Tuple3<Integer, Long, String>, Integer>())
+			.returns(BasicTypeInfo.INT_TYPE_INFO);
+		List<Integer> result = resultDs.collect();
+
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
+
+		compareResultAsText(result, expectedResult);
+	}
+
+	@Test
+	public void testCoGroupWithTypeInformationTypeHint() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().disableSysoutLogging();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Integer> resultDs = ds1
+			.coGroup(ds2)
+			.where(0)
+			.equalTo(0)
+			.with(new CoGrouper<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Integer>())
+			.returns(BasicTypeInfo.INT_TYPE_INFO);
+		List<Integer> result = resultDs.collect();
+
+		String expectedResult = "2\n" +
+			"3\n" +
+			"1\n";
+
+		compareResultAsText(result, expectedResult);
 	}
 
 	// --------------------------------------------------------------------------------------------


[2/5] flink git commit: [FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost

Posted by tr...@apache.org.
[FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost

In case of a heartbeat timeout or a disconnect call, the TaskExecutor tries to
reconnect to the last known JobMaster location.

This closes #5267.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63d4819e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63d4819e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63d4819e

Branch: refs/heads/master
Commit: 63d4819e197b1df1651157fd8f86c8ca0540d0b1
Parents: 438e4e3
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 20:37:08 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:14 2018 +0100

----------------------------------------------------------------------
 .../registration/RegisteredRpcConnection.java   | 101 ++++++++++++++-----
 .../runtime/taskexecutor/JobLeaderService.java  |  45 ++++++++-
 .../runtime/taskexecutor/TaskExecutor.java      |  27 ++++-
 .../taskexecutor/slot/TaskSlotTable.java        |  13 ++-
 .../RegisteredRpcConnectionTest.java            |  79 ++++++++++++---
 5 files changed, 219 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index c76bcf8..7d2c35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -46,6 +47,11 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public abstract class RegisteredRpcConnection<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> {
 
+	private static final AtomicReferenceFieldUpdater<RegisteredRpcConnection, RetryingRegistration> REGISTRATION_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+		RegisteredRpcConnection.class,
+		RetryingRegistration.class,
+		"pendingRegistration");
+
 	/** The logger for all log messages of this class. */
 	protected final Logger log;
 
@@ -59,7 +65,7 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 	private final Executor executor;
 
 	/** The Registration of this RPC connection. */
-	private RetryingRegistration<F, G, S> pendingRegistration;
+	private volatile RetryingRegistration<F, G, S> pendingRegistration;
 
 	/** The gateway to register, it's null until the registration is completed. */
 	private volatile G targetGateway;
@@ -85,27 +91,47 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 		checkState(!closed, "The RPC connection is already closed");
 		checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
 
-		pendingRegistration = checkNotNull(generateRegistration());
-		pendingRegistration.startRegistration();
+		final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
 
-		CompletableFuture<Tuple2<G, S>> future = pendingRegistration.getFuture();
+		if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
+			newRegistration.startRegistration();
+		} else {
+			// concurrent start operation
+			newRegistration.cancel();
+		}
+	}
 
-		future.whenCompleteAsync(
-			(Tuple2<G, S> result, Throwable failure) -> {
-				if (failure != null) {
-					if (failure instanceof CancellationException) {
-						// we ignore cancellation exceptions because they originate from cancelling
-						// the RetryingRegistration
-						log.debug("Retrying registration towards {} was cancelled.", targetAddress);
-					} else {
-						// this future should only ever fail if there is a bug, not if the registration is declined
-						onRegistrationFailure(failure);
-					}
-				} else {
-					targetGateway = result.f0;
-					onRegistrationSuccess(result.f1);
-				}
-			}, executor);
+	public boolean tryReconnect() {
+		checkState(isConnected(), "Cannot reconnect to an unknown destination.");
+
+		if (closed) {
+			return false;
+		} else {
+			final RetryingRegistration<F, G, S> currentPendingRegistration = pendingRegistration;
+
+			if (currentPendingRegistration != null) {
+				currentPendingRegistration.cancel();
+			}
+
+			final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
+
+			if (REGISTRATION_UPDATER.compareAndSet(this, currentPendingRegistration, newRegistration)) {
+				newRegistration.startRegistration();
+			} else {
+				// concurrent modification
+				newRegistration.cancel();
+				return false;
+			}
+
+			// double check for concurrent close operations
+			if (closed) {
+				newRegistration.cancel();
+
+				return false;
+			} else {
+				return true;
+			}
+		}
 	}
 
 	/**
@@ -175,13 +201,42 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 		}
 
 		if (isClosed()) {
-			connectionInfo = connectionInfo + " is closed";
+			connectionInfo += " is closed";
 		} else if (isConnected()){
-			connectionInfo = connectionInfo + " is established";
+			connectionInfo += " is established";
 		} else {
-			connectionInfo = connectionInfo + " is connecting";
+			connectionInfo += " is connecting";
 		}
 
 		return connectionInfo;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Internal methods
+	// ------------------------------------------------------------------------
+
+	private RetryingRegistration<F, G, S> createNewRegistration() {
+		RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
+
+		CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();
+
+		future.whenCompleteAsync(
+			(Tuple2<G, S> result, Throwable failure) -> {
+				if (failure != null) {
+					if (failure instanceof CancellationException) {
+						// we ignore cancellation exceptions because they originate from cancelling
+						// the RetryingRegistration
+						log.debug("Retrying registration towards {} was cancelled.", targetAddress);
+					} else {
+						// this future should only ever fail if there is a bug, not if the registration is declined
+						onRegistrationFailure(failure);
+					}
+				} else {
+					targetGateway = result.f0;
+					onRegistrationSuccess(result.f1);
+				}
+			}, executor);
+
+		return newRegistration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 77737e1..3b4da4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -205,6 +205,23 @@ public class JobLeaderService {
 	}
 
 	/**
+	 * Triggers reconnection to the last known leader of the given job.
+	 *
+	 * @param jobId specifying the job for which to trigger reconnection
+	 */
+	public void reconnect(final JobID jobId) {
+		Preconditions.checkNotNull(jobId, "JobID must not be null.");
+
+		final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> jobLeaderService = jobLeaderServices.get(jobId);
+
+		if (jobLeaderService != null) {
+			jobLeaderService.f1.reconnect();
+		} else {
+			LOG.info("Cannot reconnect to job {} because it is not registered.", jobId);
+		}
+	}
+
+	/**
 	 * Leader listener which tries to establish a connection to a newly detected job leader.
 	 */
 	private final class JobManagerLeaderListener implements LeaderRetrievalListener {
@@ -213,7 +230,7 @@ public class JobLeaderService {
 		private final JobID jobId;
 
 		/** Rpc connection to the job leader. */
-		private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+		private volatile RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
 
 		/** State of the listener. */
 		private volatile boolean stopped;
@@ -237,6 +254,32 @@ public class JobLeaderService {
 			}
 		}
 
+		public void reconnect() {
+			if (stopped) {
+				LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
+			} else {
+				final RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> currentRpcConnection = rpcConnection;
+
+				if (currentRpcConnection != null) {
+					if (currentRpcConnection.isConnected()) {
+
+						if (currentRpcConnection.tryReconnect()) {
+							// double check for concurrent stop operation
+							if (stopped) {
+								currentRpcConnection.close();
+							}
+						} else {
+							LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress());
+						}
+					} else {
+						LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress());
+					}
+				} else {
+					LOG.debug("Cannot reconnect to an unknown JobMaster.");
+				}
+			}
+		}
+
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
 			if (stopped) {

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5577472..3c7d1cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -687,6 +687,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
 		closeJobManagerConnection(jobId, cause);
+		jobLeaderService.reconnect(jobId);
 	}
 
 	@Override
@@ -1079,16 +1080,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		Preconditions.checkNotNull(allocationId);
 
 		try {
-			int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause);
+			TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause);
 
-			if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
+			if (taskSlot != null && isConnectedToResourceManager()) {
 				// the slot was freed. Tell the RM about it
 				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
 				resourceManagerGateway.notifySlotAvailable(
 					resourceManagerConnection.getRegistrationId(),
-					new SlotID(getResourceID(), freedSlotIndex),
+					new SlotID(getResourceID(), taskSlot.getIndex()),
 					allocationId);
+
+				// check whether we still have allocated slots for the same job
+				final JobID jobId = taskSlot.getJobId();
+				final Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+
+				if (!tasks.hasNext()) {
+					// we can remove the job from the job leader service
+					try {
+						jobLeaderService.removeJob(jobId);
+					} catch (Exception e) {
+						log.info("Could not remove job {} from JobLeaderService.", jobId, e);
+					}
+
+					closeJobManagerConnection(
+						jobId,
+						new FlinkException("TaskExecutor " + getAddress() +
+							" has no more allocated slots for job " + jobId + '.'));
+				}
 			}
 		} catch (SlotNotFoundException e) {
 			log.debug("Could not free slot for allocation id {}.", allocationId, e);
@@ -1295,6 +1314,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 						closeJobManagerConnection(
 							jobManagerConnection.getJobID(),
 							new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
+
+						jobLeaderService.reconnect(jobManagerConnection.getJobID());
 					}
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 62101e7..ab62a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -266,7 +268,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
 	 * @return Index of the freed slot if the slot could be freed; otherwise -1
 	 */
-	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+	public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 		return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
 	}
 
@@ -278,9 +280,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @param allocationId identifying the task slot to be freed
 	 * @param cause to fail the tasks with if slot is not empty
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
-	 * @return Index of the freed slot if the slot could be freed; otherwise -1
+	 * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
 	 */
-	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+	@Nullable
+	public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
 		checkInit();
 
 		TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -314,7 +317,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 					slotsPerJob.remove(jobId);
 				}
 
-				return taskSlot.getIndex();
+				return taskSlot;
 			} else {
 				// we couldn't free the task slot because it still contains task, fail the tasks
 				// and set the slot state to releasing so that it gets eventually freed
@@ -326,7 +329,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 					taskIterator.next().failExternally(cause);
 				}
 
-				return -1;
+				return null;
 			}
 		} else {
 			throw new SlotNotFoundException(allocationId);

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 19a5756..650a0f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -27,12 +27,15 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -60,14 +63,14 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 			connection.start();
 
 			//wait for connection established
-			Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+			final String actualConnectionId = connection.getConnectionFuture().get();
 
 			// validate correct invocation and result
 			assertTrue(connection.isConnected());
 			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
 			assertEquals(leaderId, connection.getTargetLeaderId());
 			assertEquals(testGateway, connection.getTargetGateway());
-			assertEquals(connectionID, connection.getConnectionId());
+			assertEquals(connectionID, actualConnectionId);
 		}
 		finally {
 			testGateway.stop();
@@ -86,8 +89,9 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 		try {
 			// gateway that upon calls Throw an exception
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+			final RuntimeException registrationException = new RuntimeException(connectionFailureMessage);
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow(
-				new RuntimeException(connectionFailureMessage));
+				registrationException);
 
 			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
 
@@ -95,14 +99,18 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 			connection.start();
 
 			//wait for connection failure
-			Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+			try {
+				connection.getConnectionFuture().get();
+				fail("expected failure.");
+			} catch (ExecutionException ee) {
+				assertEquals(registrationException, ee.getCause());
+			}
 
 			// validate correct invocation and result
 			assertFalse(connection.isConnected());
 			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
 			assertEquals(leaderId, connection.getTargetLeaderId());
 			assertNull(connection.getTargetGateway());
-			assertEquals(connectionFailureMessage, connection.getFailareMessage());
 		}
 		finally {
 			rpcService.stopService();
@@ -137,21 +145,53 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testReconnect() throws Exception {
+		final String connectionId1 = "Test RPC Connection ID 1";
+		final String connectionId2 = "Test RPC Connection ID 2";
+		final TestingRpcService rpcService = new TestingRpcService();
+		final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+		final UUID leaderId = UUID.randomUUID();
+		final TestRegistrationGateway testGateway = new TestRegistrationGateway(
+			new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1),
+			new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2));
+
+		try {
+			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+			TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+			connection.start();
+
+			final String actualConnectionId1 = connection.getConnectionFuture().get();
+
+			assertEquals(actualConnectionId1, connectionId1);
+
+			assertTrue(connection.tryReconnect());
+
+			final String actualConnectionId2 = connection.getConnectionFuture().get();
+
+			assertEquals(actualConnectionId2, connectionId2);
+		} finally {
+			rpcService.stopService();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  test RegisteredRpcConnection
 	// ------------------------------------------------------------------------
 
 	private static class TestRpcConnection extends RegisteredRpcConnection<UUID, TestRegistrationGateway, TestRegistrationSuccess> {
 
-		private final RpcService rpcService;
+		private final Object lock = new Object();
 
-		private String connectionId;
+		private final RpcService rpcService;
 
-		private String failureMessage;
+		private CompletableFuture<String> connectionFuture;
 
 		public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor,  RpcService rpcService) {
 			super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
 			this.rpcService = rpcService;
+			this.connectionFuture = new CompletableFuture<>();
 		}
 
 		@Override
@@ -161,20 +201,31 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 
 		@Override
 		protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) {
-			connectionId = success.getCorrelationId();
+			synchronized (lock) {
+				connectionFuture.complete(success.getCorrelationId());
+			}
 		}
 
 		@Override
 		protected void onRegistrationFailure(Throwable failure) {
-			failureMessage = failure.getMessage();
+			synchronized (lock) {
+				connectionFuture.completeExceptionally(failure);
+			}
 		}
 
-		public String getConnectionId() {
-			return connectionId;
+		@Override
+		public boolean tryReconnect() {
+			synchronized (lock) {
+				connectionFuture.cancel(false);
+				connectionFuture = new CompletableFuture<>();
+			}
+			return super.tryReconnect();
 		}
 
-		public String getFailareMessage() {
-			return failureMessage;
+		public CompletableFuture<String> getConnectionFuture() {
+			synchronized (lock) {
+				return connectionFuture;
+			}
 		}
 	}
 }


[5/5] flink git commit: [hotfix] [tests] Fix PageRankITCase, AggregatorsITCase and DataSinkITCase to use fresh result path

Posted by tr...@apache.org.
[hotfix] [tests] Fix PageRankITCase, AggregatorsITCase and DataSinkITCase to use fresh result path


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d42759d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d42759d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d42759d0

Branch: refs/heads/master
Commit: d42759d0a17d17adc5af9c26b939d431acba5b08
Parents: a6ee040
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:20:35 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/core/fs/FileSystem.java   | 2 +-
 .../org/apache/flink/test/example/java/PageRankITCase.java   | 6 +++++-
 .../flink/test/iterative/aggregators/AggregatorsITCase.java  | 8 +++++---
 .../java/org/apache/flink/test/operators/DataSinkITCase.java | 6 +++++-
 4 files changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index 07a1e76..8698595 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -752,7 +752,7 @@ public abstract class FileSystem {
 						return true;
 					} else {
 						// file may not be overwritten
-						throw new IOException("File or directory already exists. Existing files and directories " +
+						throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " +
 								"are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " +
 								WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
index ee7bf82..2fae57d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/java/PageRankITCase.java
@@ -33,6 +33,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.util.UUID;
 
 /**
  * Test for {@link PageRank}.
@@ -54,7 +55,10 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 
 	@Before
 	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
+		final File folder = tempFolder.newFolder();
+		final File resultFile = new File(folder, UUID.randomUUID().toString());
+		resultPath = resultFile.toURI().toString();
+
 		File verticesFile = tempFolder.newFile();
 		FileUtils.writeFileUtf8(verticesFile, PageRankData.VERTICES);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index 64ee98a..bd42ac2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -49,6 +49,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.util.Random;
+import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
 
@@ -78,9 +79,10 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 
 	@Before
 	public void before() throws Exception{
-		File tempFile = tempFolder.newFile();
-		testPath = tempFile.toString();
-		resultPath = tempFile.toURI().toString();
+		final File folder = tempFolder.newFolder();
+		final File resultFile = new File(folder, UUID.randomUUID().toString());
+		testPath = resultFile.toString();
+		resultPath = resultFile.toURI().toString();
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/d42759d0/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
index deb5170..b35e818 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/DataSinkITCase.java
@@ -35,7 +35,9 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.util.Random;
+import java.util.UUID;
 
 import static org.junit.Assert.assertTrue;
 
@@ -57,7 +59,9 @@ public class DataSinkITCase extends MultipleProgramsTestBase {
 
 	@Before
 	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
+		final File folder = tempFolder.newFolder();
+		final File resultFile = new File(folder, UUID.randomUUID().toString());
+		resultPath = resultFile.toURI().toString();
 	}
 
 	@Test


[4/5] flink git commit: [hotfix] [tests] Fix JavaProgramTestBase to reset MiniClusterResource#TestEnvironment

Posted by tr...@apache.org.
[hotfix] [tests] Fix JavaProgramTestBase to reset MiniClusterResource#TestEnvironment


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6ee040c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6ee040c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6ee040c

Branch: refs/heads/master
Commit: a6ee040c5dbf8b93e94dcbf353e0db897ff8fb29
Parents: 244f03f
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Dec 1 19:20:04 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:15 2018 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/test/util/JavaProgramTestBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6ee040c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index de536d8..9262ae6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -210,7 +210,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			e.printStackTrace();
 			Assert.fail("Error while calling the test program: " + e.getMessage());
 		} finally {
-			CollectionTestEnvironment.unsetAsContext();
+			miniClusterResource.getTestEnvironment().setAsContext();
 		}
 
 		Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);