You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 18:37:27 UTC
[1/3] flink git commit: Add autoparallelism to jobs
Repository: flink
Updated Branches:
refs/heads/release-0.8 941712941 -> a6f9f9939
Add autoparallelism to jobs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ff3f4c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ff3f4c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ff3f4c4
Branch: refs/heads/release-0.8
Commit: 4ff3f4c43594610090c2c17220bd522df6ec1c7c
Parents: 9417129
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 14:21:54 2015 +0100
----------------------------------------------------------------------
.../client/program/AutoParallelismITCase.java | 118 +++++++++++++++++++
.../flink/api/common/ExecutionConfig.java | 6 +
.../flink/runtime/jobmanager/JobManager.java | 7 ++
3 files changed, 131 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4ff3f4c4/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
new file mode 100644
index 0000000..c1fa888
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+public class AutoParallelismITCase {
+
+ private static final int NUM_TM = 2;
+ private static final int SLOTS_PER_TM = 7;
+ private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+ @Test
+ public void testProgramWithAutoParallelism() {
+
+ NepheleMiniCluster cluster = new NepheleMiniCluster();
+ cluster.setNumTaskManager(NUM_TM);
+ cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
+
+ try {
+ cluster.start();
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort());
+ env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+ DataSet<Integer> result = env
+ .createInput(new ParallelismDependentInputFormat())
+ .mapPartition(new ParallelismDependentMapPartition());
+
+ List<Integer> resultCollection = new ArrayList<Integer>();
+ result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+ env.execute();
+
+ assertEquals(PARALLELISM, resultCollection.size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ try {
+ cluster.stop();
+ }
+ catch (Throwable t) {
+ // ignore exceptions on shutdown
+ }
+ }
+ }
+
+ private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+ private transient boolean emitted;
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+ assertEquals(PARALLELISM, numSplits);
+ return super.createInputSplits(numSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return emitted;
+ }
+
+ @Override
+ public Integer nextRecord(Integer reuse) {
+ if (emitted) {
+ return null;
+ }
+ emitted = true;
+ return 1;
+ }
+ }
+
+ private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
+
+ @Override
+ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
+ out.collect(getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4ff3f4c4/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 03d5e3a..8216b25 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -31,6 +31,12 @@ public class ExecutionConfig implements Serializable {
// Key for storing it in the Job Configuration
public static final String CONFIG_KEY = "runtime.config";
+ /**
+ * The constant to use for the degree of parallelism, if the system should use the number
+ * of currently available slots.
+ */
+ public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+
private boolean useClosureCleaner = true;
private int degreeOfParallelism = -1;
private int numberOfExecutionRetries = -1;
http://git-wip-us.apache.org/repos/asf/flink/blob/4ff3f4c4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 4f61d94..223c6c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -40,6 +40,7 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -374,6 +375,8 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName()));
}
+ final int numSlots = scheduler.getTotalNumberOfSlots();
+
for (AbstractJobVertex vertex : job.getVertices()) {
// check that the vertex has an executable class
String executableClass = vertex.getInvokableClassName();
@@ -383,6 +386,10 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
// master side initialization
vertex.initializeOnMaster(userCodeLoader);
+
+ if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+ vertex.setParallelism(numSlots);
+ }
}
// first topologically sort the job vertices to form the basis of creating the execution graph
[2/3] flink git commit: [tests] Add test for restart recovery
Posted by se...@apache.org.
[tests] Add test for restart recovery
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/785f2041
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/785f2041
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/785f2041
Branch: refs/heads/release-0.8
Commit: 785f2041fdf8f8409e7cfcc66451b79d7bf57673
Parents: 4ff3f4c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 16 21:40:06 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 17:49:59 2015 +0100
----------------------------------------------------------------------
.../client/minicluster/NepheleMiniCluster.java | 40 ++-
.../client/program/AutoParallelismITCase.java | 118 --------
.../flink/api/java/ExecutionEnvironment.java | 3 +
.../runtime/executiongraph/ExecutionGraph.java | 2 +-
.../test/recovery/AutoParallelismITCase.java | 122 ++++++++
.../test/recovery/SimpleRecoveryITCase.java | 287 +++++++++++++++++++
6 files changed, 448 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index a40b733..79099c7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -82,6 +82,9 @@ public class NepheleMiniCluster {
private boolean defaultAlwaysCreateDirectory = false;
+ private long heartbeatInterval = ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL;
+
+ private long heartbeatTimeout = ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT;
private JobManager jobManager;
@@ -163,11 +166,33 @@ public class NepheleMiniCluster {
public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
- public int getNumTaskManager() { return numTaskManager; }
+ public int getNumTaskManager() {
+ return numTaskManager;
+ }
- public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+ public void setTaskManagerNumSlots(int taskManagerNumSlots) {
+ this.taskManagerNumSlots = taskManagerNumSlots;
+ }
- public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
+ public int getTaskManagerNumSlots() {
+ return taskManagerNumSlots;
+ }
+
+ public void setHeartbeatInterval(long heartbeatInterval) {
+ this.heartbeatInterval = heartbeatInterval;
+ }
+
+ public long getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ public void setHeartbeatTimeout(long heartbeatTimeout) {
+ this.heartbeatTimeout = heartbeatTimeout;
+ }
+
+ public long getHeartbeatTimeout() {
+ return heartbeatTimeout;
+ }
// ------------------------------------------------------------------------
// Life cycle and Job Submission
@@ -206,7 +231,8 @@ public class NepheleMiniCluster {
} else {
Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
- defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager);
+ defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager,
+ heartbeatInterval, heartbeatTimeout);
GlobalConfiguration.includeConfiguration(conf);
}
@@ -297,7 +323,8 @@ public class NepheleMiniCluster {
public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory,
boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory,
- int taskManagerNumSlots, int numTaskManager)
+ int taskManagerNumSlots, int numTaskManager,
+ long heartbeatInterval, long heartbeatTimeout)
{
final Configuration config = new Configuration();
@@ -350,6 +377,9 @@ public class NepheleMiniCluster {
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
+
+ config.setLong(ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY, heartbeatInterval);
+ config.setLong(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, heartbeatTimeout);
return config;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
deleted file mode 100644
index c1fa888..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/program/AutoParallelismITCase.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.program;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.minicluster.NepheleMiniCluster;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * This test verifies that the auto parallelism is properly forwarded to the runtime.
- */
-public class AutoParallelismITCase {
-
- private static final int NUM_TM = 2;
- private static final int SLOTS_PER_TM = 7;
- private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
-
- @Test
- public void testProgramWithAutoParallelism() {
-
- NepheleMiniCluster cluster = new NepheleMiniCluster();
- cluster.setNumTaskManager(NUM_TM);
- cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
-
- try {
- cluster.start();
-
- ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort());
- env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
-
- DataSet<Integer> result = env
- .createInput(new ParallelismDependentInputFormat())
- .mapPartition(new ParallelismDependentMapPartition());
-
- List<Integer> resultCollection = new ArrayList<Integer>();
- result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
-
- env.execute();
-
- assertEquals(PARALLELISM, resultCollection.size());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- try {
- cluster.stop();
- }
- catch (Throwable t) {
- // ignore exceptions on shutdown
- }
- }
- }
-
- private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
-
- private transient boolean emitted;
-
- @Override
- public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
- assertEquals(PARALLELISM, numSplits);
- return super.createInputSplits(numSplits);
- }
-
- @Override
- public boolean reachedEnd() {
- return emitted;
- }
-
- @Override
- public Integer nextRecord(Integer reuse) {
- if (emitted) {
- return null;
- }
- emitted = true;
- return 1;
- }
- }
-
- private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
-
- @Override
- public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
- out.collect(getRuntimeContext().getIndexOfThisSubtask());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 61a74b9..2026ace 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -813,6 +813,9 @@ public abstract class ExecutionEnvironment {
if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism());
}
+ if (getNumberOfExecutionRetries() >= 0) {
+ plan.setNumberOfExecutionRetries(getNumberOfExecutionRetries());
+ }
try {
registerCachedFilesWithPlan(plan);
http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1f9cf26..dfd2e39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -735,7 +735,7 @@ public class ExecutionGraph {
throw new IllegalStateException("Can only restart job from state restarting.");
}
if (scheduler == null) {
- throw new IllegalStateException("The execution graph has not been schedudled before - scheduler is null.");
+ throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
}
this.currentExecutions.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
new file mode 100644
index 0000000..bbfe0a9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AutoParallelismITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test verifies that the auto parallelism is properly forwarded to the runtime.
+ */
+public class AutoParallelismITCase {
+
+ private static final int NUM_TM = 2;
+ private static final int SLOTS_PER_TM = 7;
+ private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM;
+
+ @Test
+ public void testProgramWithAutoParallelism() {
+
+ NepheleMiniCluster cluster = new NepheleMiniCluster();
+ cluster.setNumTaskManager(NUM_TM);
+ cluster.setTaskManagerNumSlots(SLOTS_PER_TM);
+
+ try {
+ cluster.start();
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRpcPort());
+ env.setDegreeOfParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX);
+
+ DataSet<Integer> result = env
+ .createInput(new ParallelismDependentInputFormat())
+ .rebalance()
+ .mapPartition(new ParallelismDependentMapPartition());
+
+ List<Integer> resultCollection = new ArrayList<Integer>();
+ result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
+
+ env.execute();
+
+ assertEquals(PARALLELISM, resultCollection.size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ try {
+ cluster.stop();
+ }
+ catch (Throwable t) {
+ // ignore exceptions on shutdown
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Utility classes
+ // --------------------------------------------------------------------------------------------
+
+ private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
+
+ private transient boolean emitted;
+
+ @Override
+ public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
+ assertEquals(PARALLELISM, numSplits);
+ return super.createInputSplits(numSplits);
+ }
+
+ @Override
+ public boolean reachedEnd() {
+ return emitted;
+ }
+
+ @Override
+ public Integer nextRecord(Integer reuse) {
+ if (emitted) {
+ return null;
+ }
+ emitted = true;
+ return 1;
+ }
+ }
+
+ private static class ParallelismDependentMapPartition extends RichMapPartitionFunction<Integer, Integer> {
+
+ @Override
+ public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
+ out.collect(getRuntimeContext().getIndexOfThisSubtask());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/785f2041/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
new file mode 100644
index 0000000..bec9c3f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class SimpleRecoveryITCase {
+
+ private static NepheleMiniCluster cluster;
+
+ @BeforeClass
+ public static void setupCluster() {
+ try {
+ cluster = new NepheleMiniCluster();
+ cluster.setNumTaskManager(2);
+ cluster.setTaskManagerNumSlots(2);
+
+ // these two parameters determine how fast the restart happens
+ cluster.setHeartbeatInterval(500);
+ cluster.setHeartbeatTimeout(2000);
+
+ cluster.start();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Could not start test minicluster: " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDownCluster() {
+ try {
+ cluster.stop();
+ }
+ catch (Exception e) {
+ System.err.println("Error stopping cluster on shutdown");
+ e.printStackTrace();
+ fail("Cluster shutdown caused an exception: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testFailedRunThenSuccessfulRun() {
+
+ try {
+ List<Long> resultCollection = new ArrayList<Long>();
+
+ // attempt 1
+ {
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRpcPort());
+
+ env.setDegreeOfParallelism(4);
+ env.setNumberOfExecutionRetries(0);
+
+ env.generateSequence(1, 10)
+ .rebalance()
+ .map(new FailingMapper1<Long>())
+ .reduce(new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ })
+ .output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+ try {
+ JobExecutionResult res = env.execute();
+ String msg = res == null ? "null result" : "result in " + res.getNetRuntime();
+ fail("The program should have failed, but returned " + msg);
+ }
+ catch (ProgramInvocationException e) {
+ // expected
+ }
+ }
+
+ // attempt 2
+ {
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRpcPort());
+
+ env.setDegreeOfParallelism(4);
+ env.setNumberOfExecutionRetries(0);
+
+ env.generateSequence(1, 10)
+ .rebalance()
+ .map(new FailingMapper1<Long>())
+ .reduce(new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ })
+ .output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+ try {
+ JobExecutionResult result = env.execute();
+ assertTrue(result.getNetRuntime() >= 0);
+ assertNotNull(result.getAllAccumulatorResults());
+ assertTrue(result.getAllAccumulatorResults().isEmpty());
+ }
+ catch (JobExecutionException e) {
+ fail("The program should have succeeded on the second run");
+ }
+
+ long sum = 0;
+ for (long l : resultCollection) {
+ sum += l;
+ }
+ assertEquals(55, sum);
+ }
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRestart() {
+ try {
+ List<Long> resultCollection = new ArrayList<Long>();
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRpcPort());
+
+ env.setDegreeOfParallelism(4);
+ env.setNumberOfExecutionRetries(1);
+
+ env.generateSequence(1, 10)
+ .rebalance()
+ .map(new FailingMapper2<Long>())
+ .reduce(new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ })
+ .output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+ try {
+ JobExecutionResult result = env.execute();
+ assertTrue(result.getNetRuntime() >= 0);
+ assertNotNull(result.getAllAccumulatorResults());
+ assertTrue(result.getAllAccumulatorResults().isEmpty());
+ }
+ catch (JobExecutionException e) {
+ fail("The program should have succeeded on the second run");
+ }
+
+ long sum = 0;
+ for (long l : resultCollection) {
+ sum += l;
+ }
+ assertEquals(55, sum);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRestartMultipleTimes() {
+ try {
+ List<Long> resultCollection = new ArrayList<Long>();
+
+ ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRpcPort());
+
+ env.setDegreeOfParallelism(4);
+ env.setNumberOfExecutionRetries(3);
+
+ env.generateSequence(1, 10)
+ .rebalance()
+ .map(new FailingMapper3<Long>())
+ .reduce(new ReduceFunction<Long>() {
+ @Override
+ public Long reduce(Long value1, Long value2) {
+ return value1 + value2;
+ }
+ })
+ .output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+ try {
+ JobExecutionResult result = env.execute();
+ assertTrue(result.getNetRuntime() >= 0);
+ assertNotNull(result.getAllAccumulatorResults());
+ assertTrue(result.getAllAccumulatorResults().isEmpty());
+ }
+ catch (JobExecutionException e) {
+ fail("The program should have succeeded on the second run");
+ }
+
+ long sum = 0;
+ for (long l : resultCollection) {
+ sum += l;
+ }
+ assertEquals(55, sum);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------------------
+
+ private static class FailingMapper1<T> extends RichMapFunction<T, T> {
+
+ private static int failuresBeforeSuccess = 1;
+
+ @Override
+ public T map(T value) throws Exception {
+ if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+ failuresBeforeSuccess--;
+ throw new Exception("Test Failure");
+ }
+
+ return value;
+ }
+ }
+
+ private static class FailingMapper2<T> extends RichMapFunction<T, T> {
+
+ private static int failuresBeforeSuccess = 1;
+
+ @Override
+ public T map(T value) throws Exception {
+ if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+ failuresBeforeSuccess--;
+ throw new Exception("Test Failure");
+ }
+
+ return value;
+ }
+ }
+
+ private static class FailingMapper3<T> extends RichMapFunction<T, T> {
+
+ private static int failuresBeforeSuccess = 3;
+
+ @Override
+ public T map(T value) throws Exception {
+ if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+ failuresBeforeSuccess--;
+ throw new Exception("Test Failure");
+ }
+
+ return value;
+ }
+ }
+}
[3/3] flink git commit: [jobmanage] Move auto-parallelism of vertices
before master initialization
Posted by se...@apache.org.
[jobmanage] Move auto-parallelism of vertices before master initialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6f9f993
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6f9f993
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6f9f993
Branch: refs/heads/release-0.8
Commit: a6f9f9939ca03026baeefb3bd0876b90068b7682
Parents: 785f204
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 4 18:08:22 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 18:08:22 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/jobmanager/JobManager.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6f9f993/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 223c6c6..ac39047 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -384,12 +384,13 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
}
- // master side initialization
- vertex.initializeOnMaster(userCodeLoader);
-
+ // set the parallelism in case of auto parallelism
if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
vertex.setParallelism(numSlots);
}
+
+ // master side initialization
+ vertex.initializeOnMaster(userCodeLoader);
}
// first topologically sort the job vertices to form the basis of creating the execution graph