You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/11 08:45:00 UTC
[1/3] flink git commit: Revert "[FLINK-3232] [runtime] Add option to
eagerly deploy channels"
Repository: flink
Updated Branches:
refs/heads/master 58204da13 -> 5d5637b01
Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"
The reverted commit did not really fix anything, but hid the problem by
brute force, sending many more schedule or update consumers messages.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d2e8b29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d2e8b29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d2e8b29
Branch: refs/heads/master
Commit: 0d2e8b2964b58f5610772c6b5bf39a93b9b0fd95
Parents: 58204da
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 9 16:07:22 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 22:59:44 2016 +0100
----------------------------------------------------------------------
.../InputChannelDeploymentDescriptor.java | 17 ++-
...PartialInputChannelDeploymentDescriptor.java | 2 +-
.../ResultPartitionDeploymentDescriptor.java | 25 +---
.../executiongraph/ExecutionJobVertex.java | 10 +-
.../executiongraph/IntermediateResult.java | 11 +-
.../runtime/io/network/NetworkEnvironment.java | 3 -
.../io/network/partition/ResultPartition.java | 21 +--
.../partition/ResultPartitionManager.java | 2 -
.../runtime/jobgraph/IntermediateDataSet.java | 31 ----
.../flink/runtime/jobgraph/JobVertex.java | 12 +-
.../apache/flink/runtime/taskmanager/Task.java | 1 -
...ResultPartitionDeploymentDescriptorTest.java | 4 +-
.../ExecutionGraphDeploymentTest.java | 2 +-
.../io/network/NetworkEnvironmentTest.java | 147 -------------------
.../consumer/LocalInputChannelTest.java | 3 -
.../runtime/jobgraph/JobTaskVertexTest.java | 36 +----
.../runtime/taskmanager/TaskManagerTest.java | 20 +--
.../api/graph/StreamingJobGraphGenerator.java | 9 +-
18 files changed, 35 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 0912055..a72b92f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
@@ -88,7 +89,9 @@ public class InputChannelDeploymentDescriptor implements Serializable {
* Creates an input channel deployment descriptor for each partition.
*/
public static InputChannelDeploymentDescriptor[] fromEdges(
- ExecutionEdge[] edges, SimpleSlot consumerSlot) {
+ ExecutionEdge[] edges,
+ SimpleSlot consumerSlot,
+ boolean allowLazyDeployment) throws ExecutionGraphException {
final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
@@ -105,9 +108,11 @@ public class InputChannelDeploymentDescriptor implements Serializable {
// The producing task needs to be RUNNING or already FINISHED
if (consumedPartition.isConsumable() && producerSlot != null &&
- (producerState == ExecutionState.RUNNING
- || producerState == ExecutionState.FINISHED)) {
-
+ (producerState == ExecutionState.RUNNING ||
+ producerState == ExecutionState.FINISHED ||
+ producerState == ExecutionState.SCHEDULED ||
+ producerState == ExecutionState.DEPLOYING)) {
+
final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
@@ -124,9 +129,11 @@ public class InputChannelDeploymentDescriptor implements Serializable {
partitionLocation = ResultPartitionLocation.createRemote(connectionId);
}
}
- else {
+ else if (allowLazyDeployment) {
// The producing task might not have registered the partition yet
partitionLocation = ResultPartitionLocation.createUnknown();
+ } else {
+ throw new ExecutionGraphException("Trying to eagerly schedule a task whose inputs are not ready.");
}
final ResultPartitionID consumedPartitionId = new ResultPartitionID(
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index 0eac39d..c925f75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.deployment;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import static org.apache.flink.util.Preconditions.checkNotNull;
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index e72d468..2881dde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
/** The number of subpartitions. */
private final int numberOfSubpartitions;
- /**
- * Flag indicating whether to eagerly deploy consumers.
- *
- * <p>If <code>true</code>, the consumers are deployed as soon as the
- * runtime result is registered at the result manager of the task manager.
- */
- private final boolean eagerlyDeployConsumers;
-
public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
- int numberOfSubpartitions,
- boolean eagerlyDeployConsumers) {
+ int numberOfSubpartitions) {
this.resultId = checkNotNull(resultId);
this.partitionId = checkNotNull(partitionId);
@@ -69,7 +60,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
- this.eagerlyDeployConsumers = eagerlyDeployConsumers;
}
public IntermediateDataSetID getResultId() {
@@ -88,16 +78,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
return numberOfSubpartitions;
}
- /**
- * Returns whether consumers should be deployed eagerly (as soon as they
- * are registered at the result manager of the task manager).
- *
- * @return Whether consumers should be deployed eagerly
- */
- public boolean getEagerlyDeployConsumers() {
- return eagerlyDeployConsumers;
- }
-
@Override
public String toString() {
return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
@@ -129,7 +109,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
}
return new ResultPartitionDeploymentDescriptor(
- resultId, partitionId, partitionType, numberOfSubpartitions,
- partition.getIntermediateResult().getEagerlyDeployConsumers());
+ resultId, partitionId, partitionType, numberOfSubpartitions);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 47cfde1..a62ed86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
@@ -30,13 +32,11 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.api.common.Archiveable;
import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
-
import scala.Option;
import java.io.IOException;
@@ -161,8 +160,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
result.getId(),
this,
numTaskVertices,
- result.getResultType(),
- result.getEagerlyDeployConsumers());
+ result.getResultType());
}
// create all task vertices
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 9d57014..c2c19d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -46,14 +46,11 @@ public class IntermediateResult {
private final ResultPartitionType resultType;
- private final boolean eagerlyDeployConsumers;
-
public IntermediateResult(
IntermediateDataSetID id,
ExecutionJobVertex producer,
int numParallelProducers,
- ResultPartitionType resultType,
- boolean eagerlyDeployConsumers) {
+ ResultPartitionType resultType) {
this.id = checkNotNull(id);
this.producer = checkNotNull(producer);
@@ -71,8 +68,6 @@ public class IntermediateResult {
// The runtime type for this produced result
this.resultType = checkNotNull(resultType);
-
- this.eagerlyDeployConsumers = eagerlyDeployConsumers;
}
public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
@@ -108,10 +103,6 @@ public class IntermediateResult {
return resultType;
}
- public boolean getEagerlyDeployConsumers() {
- return eagerlyDeployConsumers;
- }
-
public int registerConsumer() {
final int index = numConsumers;
numConsumers++;
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b221ec7..d0032d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -158,8 +157,6 @@ public class NetworkEnvironment {
throw new IllegalStateException("Unequal number of writers and partitions.");
}
- ResultPartitionConsumableNotifier jobManagerNotifier;
-
synchronized (lock) {
if (isShutdown) {
throw new IllegalStateException("NetworkEnvironment is shut down");
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index f06cb43..034b27a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -28,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.slf4j.Logger;
@@ -89,14 +89,6 @@ public class ResultPartition implements BufferPoolOwner {
/** Type of this partition. Defines the concrete subpartition implementation to use. */
private final ResultPartitionType partitionType;
- /**
- * Flag indicating whether to eagerly deploy consumers.
- *
- * <p>If <code>true</code>, the consumers are deployed as soon as the
- * runtime result is registered at the result manager of the task manager.
- */
- private final boolean doEagerDeployment;
-
/** The subpartitions of this partition. At least one. */
private final ResultSubpartition[] subpartitions;
@@ -137,7 +129,6 @@ public class ResultPartition implements BufferPoolOwner {
JobID jobId,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
- boolean doEagerDeployment,
int numberOfSubpartitions,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
@@ -149,7 +140,6 @@ public class ResultPartition implements BufferPoolOwner {
this.jobId = checkNotNull(jobId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
- this.doEagerDeployment = doEagerDeployment;
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
@@ -366,15 +356,6 @@ public class ResultPartition implements BufferPoolOwner {
}
/**
- * Deploys consumers if eager deployment is activated
- */
- public void deployConsumers() {
- if (doEagerDeployment) {
- partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions);
- }
- }
-
- /**
* Releases buffers held by this result partition.
*
* <p> This is a callback from the buffer pool, which is registered for result partitions, which
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 6edae6f..9da3e14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -58,8 +58,6 @@ public class ResultPartitionManager implements ResultPartitionProvider {
throw new IllegalStateException("Result partition already registered.");
}
- partition.deployConsumers();
-
LOG.debug("Registered {}.", partition);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index c30c78e..2d9faa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -45,14 +45,6 @@ public class IntermediateDataSet implements java.io.Serializable {
// The type of partition to use at runtime
private final ResultPartitionType resultType;
-
- /**
- * Flag indicating whether to eagerly deploy consumers.
- *
- * <p>If <code>true</code>, the consumers are deployed as soon as the
- * runtime result is registered at the result manager of the task manager.
- */
- private boolean eagerlyDeployConsumers;
// --------------------------------------------------------------------------------------------
@@ -87,29 +79,6 @@ public class IntermediateDataSet implements java.io.Serializable {
public ResultPartitionType getResultType() {
return resultType;
}
-
- /**
- * Sets the flag indicating whether to eagerly deploy consumers (default:
- * <code>false</code>).
- *
- * @param eagerlyDeployConsumers If <code>true</code>, the consumers are
- * deployed as soon as the runtime result is
- * registered at the result manager of the
- * task manager. Default is <code>false</code>.
- */
- public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) {
- this.eagerlyDeployConsumers = eagerlyDeployConsumers;
- }
-
- /**
- * Returns whether consumers should be deployed eagerly (as soon as they
- * are registered at the result manager of the task manager).
- *
- * @return Whether consumers should be deployed eagerly
- */
- public boolean getEagerlyDeployConsumers() {
- return eagerlyDeployConsumers;
- }
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 8ddc9f5..2bda9d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -382,7 +382,7 @@ public class JobVertex implements java.io.Serializable {
}
public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
- return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED, false);
+ return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
}
public JobEdge connectNewDataSetAsInput(
@@ -390,17 +390,7 @@ public class JobVertex implements java.io.Serializable {
DistributionPattern distPattern,
ResultPartitionType partitionType) {
- return connectNewDataSetAsInput(input, distPattern, partitionType, false);
- }
-
- public JobEdge connectNewDataSetAsInput(
- JobVertex input,
- DistributionPattern distPattern,
- ResultPartitionType partitionType,
- boolean eagerlyDeployConsumers) {
-
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
- dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 827451e..6907606 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -341,7 +341,6 @@ public class Task implements Runnable, TaskActions {
jobId,
partitionId,
desc.getPartitionType(),
- desc.getEagerlyDeployConsumers(),
desc.getNumberOfSubpartitions(),
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index d2fcc7b..4b1e546 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -45,8 +45,7 @@ public class ResultPartitionDeploymentDescriptorTest {
resultId,
partitionId,
partitionType,
- numberOfSubpartitions,
- eagerlyDeployConsumers);
+ numberOfSubpartitions);
ResultPartitionDeploymentDescriptor copy =
CommonTestUtils.createCopySerializable(orig);
@@ -55,6 +54,5 @@ public class ResultPartitionDeploymentDescriptorTest {
assertEquals(partitionId, copy.getPartitionId());
assertEquals(partitionType, copy.getPartitionType());
assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
- assertEquals(eagerlyDeployConsumers, copy.getEagerlyDeployConsumers());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 63da1ab..d4acd8c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -308,7 +308,7 @@ public class ExecutionGraphDeploymentTest {
v1.setInvokableClass(BatchTask.class);
v2.setInvokableClass(BatchTask.class);
- v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false);
+ v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
deleted file mode 100644
index 13da18e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-import scala.Some;
-import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
-
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class NetworkEnvironmentTest {
- /**
- * Registers a task with an eager and non-eager partition at the network
- * environment and verifies that there is exactly on schedule or update
- * message to the job manager for the eager partition.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testEagerlyDeployConsumers() throws Exception {
- // Mock job manager => expected interactions will be verified
- final ActorGateway jobManager = mock(ActorGateway.class);
- when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
- .thenReturn(new Promise.DefaultPromise<>().future());
-
- // Network environment setup
- NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
- 20,
- 1024,
- MemoryType.HEAP,
- IOManager.IOMode.SYNC,
- Some.<NettyConfig>empty(),
- 0,
- 0);
-
- NetworkEnvironment env = new NetworkEnvironment(
- new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize(), config.memoryType()),
- new LocalConnectionManager(),
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new KvStateRegistry(),
- null,
- config.ioMode(),
- config.partitionRequestInitialBackoff(),
- config.partitinRequestMaxBackoff());
-
- env.start();
-
- ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new ActorGatewayResultPartitionConsumableNotifier(
- TestingUtils.defaultExecutionContext(),
- jobManager,
- new FiniteDuration(30L, TimeUnit.SECONDS));
-
- // Register mock task
- JobID jobId = new JobID();
- Task mockTask = mock(Task.class);
-
- ResultPartition[] partitions = new ResultPartition[2];
- partitions[0] = createPartition(mockTask, "p1", jobId, true, env, resultPartitionConsumableNotifier);
- partitions[1] = createPartition(mockTask, "p2", jobId, false, env, resultPartitionConsumableNotifier);
-
- ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
- writers[0] = new ResultPartitionWriter(partitions[0]);
- writers[1] = new ResultPartitionWriter(partitions[1]);
-
- when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]);
- when(mockTask.getAllWriters()).thenReturn(writers);
- when(mockTask.getProducedPartitions()).thenReturn(partitions);
-
- env.registerTask(mockTask);
-
- // Verify
- ResultPartitionID eagerPartitionId = partitions[0].getPartitionId();
-
- verify(jobManager, times(1)).ask(
- eq(new ScheduleOrUpdateConsumers(jobId, eagerPartitionId)),
- any(FiniteDuration.class));
- }
-
- /**
- * Helper to create a mock result partition.
- */
- private static ResultPartition createPartition(
- Task owningTask,
- String name,
- JobID jobId,
- boolean eagerlyDeployConsumers,
- NetworkEnvironment env,
- ResultPartitionConsumableNotifier resultPartitionConsumableNotifier) {
-
- return new ResultPartition(
- name,
- owningTask,
- jobId,
- new ResultPartitionID(),
- ResultPartitionType.PIPELINED,
- eagerlyDeployConsumers,
- 1,
- env.getResultPartitionManager(),
- resultPartitionConsumableNotifier,
- mock(IOManager.class),
- env.getDefaultIOMode());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 19bb67e..2d3797d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Lists;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.execution.CancelTaskException;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -122,7 +120,6 @@ public class LocalInputChannelTest {
jobId,
partitionIds[i],
ResultPartitionType.PIPELINED,
- false,
parallelism,
partitionManager,
partitionConsumableNotifier,
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index c3ba909..48f06b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.jobgraph;
-import java.io.IOException;
-
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InitializeOnMaster;
@@ -29,10 +27,11 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.junit.Test;
+import java.io.IOException;
+
import static org.junit.Assert.*;
@SuppressWarnings("serial")
@@ -130,36 +129,7 @@ public class JobTaskVertexTest {
fail(e.getMessage());
}
}
-
- /**
- * Verifies correct setting of eager deploy settings.
- */
- @Test
- public void testEagerlyDeployConsumers() throws Exception {
- JobVertex producer = new JobVertex("producer");
-
- {
- JobVertex consumer = new JobVertex("consumer");
- JobEdge edge = consumer.connectNewDataSetAsInput(
- producer, DistributionPattern.ALL_TO_ALL);
- assertFalse(edge.getSource().getEagerlyDeployConsumers());
- }
-
- {
- JobVertex consumer = new JobVertex("consumer");
- JobEdge edge = consumer.connectNewDataSetAsInput(
- producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
- assertFalse(edge.getSource().getEagerlyDeployConsumers());
- }
-
- {
- JobVertex consumer = new JobVertex("consumer");
- JobEdge edge = consumer.connectNewDataSetAsInput(
- producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
- assertTrue(edge.getSource().getEagerlyDeployConsumers());
- }
- }
-
+
// --------------------------------------------------------------------------------------------
private static final class TestingOutputFormat extends DiscardingOutputFormat<Object> implements InitializeOnMaster {
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index ad107b1..15947f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -18,11 +18,7 @@
package org.apache.flink.runtime.taskmanager;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.actor.Status;
+import akka.actor.*;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ExecutionConfig;
@@ -35,11 +31,7 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.*;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -630,7 +622,7 @@ public class TaskManagerTest extends TestLogger {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false));
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
@@ -775,7 +767,7 @@ public class TaskManagerTest extends TestLogger {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false));
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
@@ -1427,9 +1419,7 @@ public class TaskManagerTest extends TestLogger {
new IntermediateDataSetID(),
new IntermediateResultPartitionID(),
ResultPartitionType.PIPELINED,
- 1,
- false // don't deploy eagerly but with the first completed memory buffer
- );
+ 1);
final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
"TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/0d2e8b29/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 2065a16..48be2e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -392,20 +392,17 @@ public class StreamingJobGraphGenerator {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
- ResultPartitionType.PIPELINED,
- true);
+ ResultPartitionType.PIPELINED);
} else if (partitioner instanceof RescalePartitioner){
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
- ResultPartitionType.PIPELINED,
- true);
+ ResultPartitionType.PIPELINED);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
- ResultPartitionType.PIPELINED,
- true);
+ ResultPartitionType.PIPELINED);
}
// set strategy name so that web interface can show it.
jobEdge.setShipStrategyName(partitioner.toString());
[2/3] flink git commit: [FLINK-5040] [jobmanager] Set correct input
channel types with eager scheduling
Posted by uc...@apache.org.
[FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2742d5c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2742d5c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2742d5c1
Branch: refs/heads/master
Commit: 2742d5c1761ca02d871333e91a8ecbc6d0a52f6c
Parents: 0d2e8b2
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 9 18:25:06 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 23:06:55 2016 +0100
----------------------------------------------------------------------
.../ResultPartitionDeploymentDescriptor.java | 17 +-
.../runtime/executiongraph/ExecutionVertex.java | 21 +-
.../runtime/io/network/PartitionState.java | 18 +-
.../io/network/partition/ResultPartition.java | 8 +-
.../flink/runtime/jobgraph/ScheduleMode.java | 10 +-
.../apache/flink/runtime/taskmanager/Task.java | 10 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
.../InputChannelDeploymentDescriptorTest.java | 206 +++++++++++++++++++
...ResultPartitionDeploymentDescriptorTest.java | 6 +-
.../ExecutionVertexDeploymentTest.java | 106 ++++++----
.../network/partition/ResultPartitionTest.java | 92 +++++++++
.../consumer/LocalInputChannelTest.java | 3 +-
.../runtime/jobgraph/ScheduleModeTest.java | 36 ++++
.../runtime/taskmanager/TaskManagerTest.java | 19 +-
.../flink/runtime/taskmanager/TaskTest.java | 5 +-
15 files changed, 491 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2881dde..2ecde80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
/** The number of subpartitions. */
private final int numberOfSubpartitions;
+
+ /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
+ private final boolean lazyScheduling;
public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
- int numberOfSubpartitions) {
+ int numberOfSubpartitions,
+ boolean lazyScheduling) {
this.resultId = checkNotNull(resultId);
this.partitionId = checkNotNull(partitionId);
@@ -60,6 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
+ this.lazyScheduling = lazyScheduling;
}
public IntermediateDataSetID getResultId() {
@@ -78,6 +83,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
return numberOfSubpartitions;
}
+ public boolean allowLazyScheduling() {
+ return lazyScheduling;
+ }
+
@Override
public String toString() {
return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
@@ -87,7 +96,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
// ------------------------------------------------------------------------
- public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition) {
+ public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) {
final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
final IntermediateResultPartitionID partitionId = partition.getPartitionId();
@@ -102,13 +111,13 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {
if (partition.getConsumers().size() > 1) {
- new IllegalStateException("Currently, only a single consumer group per partition is supported.");
+ throw new IllegalStateException("Currently, only a single consumer group per partition is supported.");
}
numberOfSubpartitions = partition.getConsumers().get(0).size();
}
return new ResultPartitionDeploymentDescriptor(
- resultId, partitionId, partitionType, numberOfSubpartitions);
+ resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e7f000c..01e8660 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -568,21 +568,24 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
ExecutionAttemptID executionId,
SimpleSlot targetSlot,
TaskStateHandles taskStateHandles,
- int attemptNumber) {
-
+ int attemptNumber) throws ExecutionGraphException {
+
// Produced intermediate results
- List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
+ List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size());
+
+ // Consumed intermediate results
+ List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length);
+
+ boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
for (IntermediateResultPartition partition : resultPartitions.values()) {
- producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition));
+ producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling));
}
-
- // Consumed intermediate results
- List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>();
-
+
+
for (ExecutionEdge[] edges : inputEdges) {
InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
- .fromEdges(edges, targetSlot);
+ .fromEdges(edges, targetSlot, lazyScheduling);
// If the produced partition has multiple consumers registered, we
// need to request the one matching our sub task index.
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
index 083412b..59357fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
@@ -23,18 +23,25 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
/**
* Contains information about the state of a result partition.
*/
-public class PartitionState {
+public class PartitionState implements Serializable {
+
+ private static final long serialVersionUID = -4693651272083825031L;
+
private final IntermediateDataSetID intermediateDataSetID;
private final IntermediateResultPartitionID intermediateResultPartitionID;
private final ExecutionState executionState;
public PartitionState(
- IntermediateDataSetID intermediateDataSetID,
- IntermediateResultPartitionID intermediateResultPartitionID,
- ExecutionState executionState) {
+ IntermediateDataSetID intermediateDataSetID,
+ IntermediateResultPartitionID intermediateResultPartitionID,
+ @Nullable ExecutionState executionState) {
+
this.intermediateDataSetID = Preconditions.checkNotNull(intermediateDataSetID);
this.intermediateResultPartitionID = Preconditions.checkNotNull(intermediateResultPartitionID);
this.executionState = executionState;
@@ -48,6 +55,9 @@ public class PartitionState {
return intermediateResultPartitionID;
}
+ /**
+ * Returns the execution state of the partition producer or <code>null</code> if it is not available.
+ */
public ExecutionState getExecutionState() {
return executionState;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 034b27a..834318c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -96,6 +96,8 @@ public class ResultPartition implements BufferPoolOwner {
private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
+ private final boolean sendScheduleOrUpdateConsumersMessage;
+
// - Runtime state --------------------------------------------------------
private final AtomicBoolean isReleased = new AtomicBoolean();
@@ -133,7 +135,8 @@ public class ResultPartition implements BufferPoolOwner {
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
IOManager ioManager,
- IOMode defaultIoMode) {
+ IOMode defaultIoMode,
+ boolean sendScheduleOrUpdateConsumersMessage) {
this.owningTaskName = checkNotNull(owningTaskName);
this.taskActions = checkNotNull(taskActions);
@@ -143,6 +146,7 @@ public class ResultPartition implements BufferPoolOwner {
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
+ this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
// Create the subpartitions.
switch (partitionType) {
@@ -437,7 +441,7 @@ public class ResultPartition implements BufferPoolOwner {
* Notifies pipelined consumers of this result partition once.
*/
private void notifyPipelinedConsumers() {
- if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
+ if (sendScheduleOrUpdateConsumersMessage && !hasNotifiedPipelinedConsumers && partitionType.isPipelined()) {
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId, taskActions);
hasNotifiedPipelinedConsumers = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
index 9405067..6a98e46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -27,5 +27,13 @@ public enum ScheduleMode {
LAZY_FROM_SOURCES,
/** Schedules all tasks immediately. */
- EAGER
+ EAGER;
+
+ /**
+ * Returns whether we are allowed to deploy consumers lazily.
+ */
+ public boolean allowLazyDeployment() {
+ return this == LAZY_FROM_SOURCES;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 6907606..4f3dd54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -345,7 +345,8 @@ public class Task implements Runnable, TaskActions {
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
ioManager,
- networkEnvironment.getDefaultIOMode());
+ networkEnvironment.getDefaultIOMode(),
+ desc.allowLazyScheduling());
writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
@@ -568,6 +569,7 @@ public class Task implements Runnable, TaskActions {
// ----------------------------------------------------------------
LOG.info("Registering task at network: " + this);
+
network.registerTask(this);
// next, kick off the background copying of files for the distributed cache
@@ -1135,7 +1137,11 @@ public class Task implements Runnable, TaskActions {
final SingleInputGate inputGate = inputGatesById.get(resultId);
if (inputGate != null) {
- if (partitionState == ExecutionState.RUNNING) {
+ if (partitionState == ExecutionState.RUNNING ||
+ partitionState == ExecutionState.FINISHED ||
+ partitionState == ExecutionState.SCHEDULED ||
+ partitionState == ExecutionState.DEPLOYING) {
+
// Retrigger the partition request
inputGate.retriggerPartitionRequest(partitionId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9af5355..b2e1002 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -948,7 +948,7 @@ class JobManager(
if (execution != null) execution.getState else null
case None =>
// Nothing to do. This is not an error, because the request is received when a sending
- // task fails during a remote partition request.
+ // task fails or is not yet available during a remote partition request.
log.debug(s"Cannot find execution graph for job $jobId.")
null
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
new file mode 100644
index 0000000..e9e8901
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class InputChannelDeploymentDescriptorTest {
+
+ /**
+ * Tests the deployment descriptors for local, remote, and unknown partition
+ * locations (with lazy deployment allowed and all execution states for the
+ * producers).
+ */
+ @Test
+ public void testMixedLocalRemoteUnknownDeployment() throws Exception {
+ boolean allowLazyDeployment = true;
+
+ ResourceID consumerResourceId = ResourceID.generate();
+ ExecutionVertex consumer = mock(ExecutionVertex.class);
+ SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+
+ // Local and remote channel are only allowed for certain execution
+ // states.
+ for (ExecutionState state : ExecutionState.values()) {
+ // Local partition
+ ExecutionVertex localProducer = mockExecutionVertex(state, consumerResourceId);
+ IntermediateResultPartition localPartition = mockPartition(localProducer);
+ ResultPartitionID localPartitionId = new ResultPartitionID(localPartition.getPartitionId(), localProducer.getCurrentExecutionAttempt().getAttemptId());
+ ExecutionEdge localEdge = new ExecutionEdge(localPartition, consumer, 0);
+
+ // Remote partition
+ ExecutionVertex remoteProducer = mockExecutionVertex(state, ResourceID.generate()); // new resource ID
+ IntermediateResultPartition remotePartition = mockPartition(remoteProducer);
+ ResultPartitionID remotePartitionId = new ResultPartitionID(remotePartition.getPartitionId(), remoteProducer.getCurrentExecutionAttempt().getAttemptId());
+ ConnectionID remoteConnectionId = new ConnectionID(remoteProducer.getCurrentAssignedResource().getTaskManagerLocation(), 0);
+ ExecutionEdge remoteEdge = new ExecutionEdge(remotePartition, consumer, 1);
+
+ // Unknown partition
+ ExecutionVertex unknownProducer = mockExecutionVertex(state, null); // no assigned resource
+ IntermediateResultPartition unknownPartition = mockPartition(unknownProducer);
+ ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+ ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2);
+
+ InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
+ new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge},
+ consumerSlot,
+ allowLazyDeployment);
+
+ assertEquals(3, desc.length);
+
+ // These states are allowed
+ if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHED ||
+ state == ExecutionState.SCHEDULED || state == ExecutionState.DEPLOYING) {
+
+ // Create local or remote channels
+ assertEquals(localPartitionId, desc[0].getConsumedPartitionId());
+ assertTrue(desc[0].getConsumedPartitionLocation().isLocal());
+ assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+ assertEquals(remotePartitionId, desc[1].getConsumedPartitionId());
+ assertTrue(desc[1].getConsumedPartitionLocation().isRemote());
+ assertEquals(remoteConnectionId, desc[1].getConsumedPartitionLocation().getConnectionId());
+ } else {
+ // Unknown (lazy deployment allowed)
+ assertEquals(localPartitionId, desc[0].getConsumedPartitionId());
+ assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+ assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+ assertEquals(remotePartitionId, desc[1].getConsumedPartitionId());
+ assertTrue(desc[1].getConsumedPartitionLocation().isUnknown());
+ assertNull(desc[1].getConsumedPartitionLocation().getConnectionId());
+ }
+
+ assertEquals(unknownPartitionId, desc[2].getConsumedPartitionId());
+ assertTrue(desc[2].getConsumedPartitionLocation().isUnknown());
+ assertNull(desc[2].getConsumedPartitionLocation().getConnectionId());
+ }
+ }
+
+ @Test
+ public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception {
+ ResourceID consumerResourceId = ResourceID.generate();
+ ExecutionVertex consumer = mock(ExecutionVertex.class);
+ SimpleSlot consumerSlot = mockSlot(consumerResourceId);
+
+
+ // Unknown partition
+ ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
+ IntermediateResultPartition unknownPartition = mockPartition(unknownProducer);
+ ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+ ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2);
+
+ // This should work if lazy deployment is allowed
+ boolean allowLazyDeployment = true;
+
+ InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
+ new ExecutionEdge[]{unknownEdge},
+ consumerSlot,
+ allowLazyDeployment);
+
+ assertEquals(1, desc.length);
+
+ assertEquals(unknownPartitionId, desc[0].getConsumedPartitionId());
+ assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+ assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+
+ try {
+ // Fail if lazy deployment is *not* allowed
+ allowLazyDeployment = false;
+
+ InputChannelDeploymentDescriptor.fromEdges(
+ new ExecutionEdge[]{unknownEdge},
+ consumerSlot,
+ allowLazyDeployment);
+
+ fail("Did not throw expected ExecutionGraphException");
+ } catch (ExecutionGraphException ignored) {
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static SimpleSlot mockSlot(ResourceID resourceId) {
+ SimpleSlot slot = mock(SimpleSlot.class);
+ when(slot.getTaskManagerLocation()).thenReturn(new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 5000));
+ when(slot.getTaskManagerID()).thenReturn(resourceId);
+
+ return slot;
+ }
+
+ private static ExecutionVertex mockExecutionVertex(ExecutionState state, ResourceID resourceId) {
+ ExecutionVertex vertex = mock(ExecutionVertex.class);
+
+ Execution exec = mock(Execution.class);
+ when(exec.getState()).thenReturn(state);
+ when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+ if (resourceId != null) {
+ SimpleSlot slot = mockSlot(resourceId);
+ when(exec.getAssignedResource()).thenReturn(slot);
+ when(vertex.getCurrentAssignedResource()).thenReturn(slot);
+ } else {
+ when(exec.getAssignedResource()).thenReturn(null); // no resource
+ when(vertex.getCurrentAssignedResource()).thenReturn(null);
+ }
+
+ when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+ return vertex;
+ }
+
+ private static IntermediateResultPartition mockPartition(ExecutionVertex producer) {
+ IntermediateResultPartition partition = mock(IntermediateResultPartition.class);
+ when(partition.isConsumable()).thenReturn(true);
+
+ IntermediateResult result = mock(IntermediateResult.class);
+ when(result.getConnectionIndex()).thenReturn(0);
+
+ when(partition.getIntermediateResult()).thenReturn(result);
+ when(partition.getPartitionId()).thenReturn(new IntermediateResultPartitionID());
+
+ when(partition.getProducer()).thenReturn(producer);
+
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4b1e546..4223b49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class ResultPartitionDeploymentDescriptorTest {
@@ -38,14 +39,14 @@ public class ResultPartitionDeploymentDescriptorTest {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
int numberOfSubpartitions = 24;
- boolean eagerlyDeployConsumers = true;
ResultPartitionDeploymentDescriptor orig =
new ResultPartitionDeploymentDescriptor(
resultId,
partitionId,
partitionType,
- numberOfSubpartitions);
+ numberOfSubpartitions,
+ true);
ResultPartitionDeploymentDescriptor copy =
CommonTestUtils.createCopySerializable(orig);
@@ -54,5 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
assertEquals(partitionId, copy.getPartitionId());
assertEquals(partitionType, copy.getPartitionType());
assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
+ assertTrue(copy.allowLazyScheduling());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 54aeff9..8bc39a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -18,20 +18,37 @@
package org.apache.flink.runtime.executiongraph;
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-
-import static org.junit.Assert.*;
-
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-
import org.junit.Test;
+import java.util.Collection;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class ExecutionVertexDeploymentTest {
@Test
@@ -48,7 +65,7 @@ public class ExecutionVertexDeploymentTest {
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
vertex.deployToSlot(slot);
@@ -58,8 +75,7 @@ public class ExecutionVertexDeploymentTest {
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
- }
- catch (IllegalStateException e) {
+ } catch (IllegalStateException e) {
// as expected
}
@@ -67,8 +83,7 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -82,12 +97,12 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
final Instance instance = getInstance(
- new ActorTaskManagerGateway(
- new SimpleActorGateway(TestingUtils.directExecutionContext())));
+ new ActorTaskManagerGateway(
+ new SimpleActorGateway(TestingUtils.directExecutionContext())));
final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
@@ -99,8 +114,7 @@ public class ExecutionVertexDeploymentTest {
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
- }
- catch (IllegalStateException e) {
+ } catch (IllegalStateException e) {
// as expected
}
@@ -109,8 +123,7 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -123,7 +136,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
new ActorTaskManagerGateway(
@@ -138,8 +151,7 @@ public class ExecutionVertexDeploymentTest {
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
- }
- catch (IllegalStateException e) {
+ } catch (IllegalStateException e) {
// as expected
}
@@ -149,16 +161,14 @@ public class ExecutionVertexDeploymentTest {
try {
vertex.deployToSlot(slot);
fail("Scheduled from wrong state");
- }
- catch (IllegalStateException e) {
+ } catch (IllegalStateException e) {
// as expected
}
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -171,7 +181,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext());
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
new ActorTaskManagerGateway(
@@ -189,8 +199,7 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -202,7 +211,7 @@ public class ExecutionVertexDeploymentTest {
final JobVertexID jid = new JobVertexID();
final ExecutionJobVertex ejv = getExecutionVertex(jid);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
new ActorTaskManagerGateway(
@@ -229,8 +238,7 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -247,7 +255,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid, ec);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
final Instance instance = getInstance(
new ActorTaskManagerGateway(
@@ -270,8 +278,7 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -288,7 +295,7 @@ public class ExecutionVertexDeploymentTest {
final ExecutionJobVertex ejv = getExecutionVertex(jid, context);
final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
- AkkaUtils.getDefaultTimeout());
+ AkkaUtils.getDefaultTimeout());
final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId();
@@ -334,10 +341,37 @@ public class ExecutionVertexDeploymentTest {
assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
assertTrue(queue.isEmpty());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
+
+ /**
+ * Tests that the lazy scheduling flag is correctly forwarded to the produced partition descriptors.
+ */
+ @Test
+ public void testTddProducedPartitionsLazyScheduling() throws Exception {
+ TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
+ ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context);
+ IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
+ ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+
+ Slot root = mock(Slot.class);
+ when(root.getSlotNumber()).thenReturn(1);
+ SimpleSlot slot = mock(SimpleSlot.class);
+ when(slot.getRoot()).thenReturn(root);
+
+ for (ScheduleMode mode : ScheduleMode.values()) {
+ vertex.getExecutionGraph().setScheduleMode(mode);
+
+ TaskDeploymentDescriptor tdd = vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, 1);
+
+ Collection<ResultPartitionDeploymentDescriptor> producedPartitions = tdd.getProducedPartitions();
+
+ assertEquals(1, producedPartitions.size());
+ ResultPartitionDeploymentDescriptor desc = producedPartitions.iterator().next();
+ assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
new file mode 100644
index 0000000..f6fddfa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ResultPartitionTest {
+
+ /**
+ * Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
+ */
+ @Test
+ public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
+ {
+ // Pipelined, send message => notify
+ ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
+ partition.add(TestBufferFactory.createBuffer(), 0);
+ verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+ }
+
+ {
+ // Pipelined, don't send message => don't notify
+ ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false);
+ partition.add(TestBufferFactory.createBuffer(), 0);
+ verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+ }
+
+ {
+ // Blocking, send message => don't notify
+ ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true);
+ partition.add(TestBufferFactory.createBuffer(), 0);
+ verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+ }
+
+ {
+ // Blocking, don't send message => don't notify
+ ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false);
+ partition.add(TestBufferFactory.createBuffer(), 0);
+ verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static ResultPartition createPartition(
+ ResultPartitionConsumableNotifier notifier,
+ ResultPartitionType type,
+ boolean sendScheduleOrUpdateConsumersMessage) {
+ return new ResultPartition(
+ "TestTask",
+ mock(TaskActions.class),
+ new JobID(),
+ new ResultPartitionID(),
+ type,
+ 1,
+ mock(ResultPartitionManager.class),
+ notifier,
+ mock(IOManager.class),
+ IOManager.IOMode.SYNC,
+ sendScheduleOrUpdateConsumersMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 2d3797d..4ca1d1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -124,7 +124,8 @@ public class LocalInputChannelTest {
partitionManager,
partitionConsumableNotifier,
ioManager,
- ASYNC);
+ ASYNC,
+ true);
// Create a buffer pool for this partition
partition.registerBufferPool(
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
new file mode 100644
index 0000000..144ef12
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ScheduleModeTest {
+
+ /**
+ * Test that schedule modes set the lazy deployment flag correctly.
+ */
+ @Test
+ public void testAllowLazyDeployment() throws Exception {
+ assertTrue(ScheduleMode.LAZY_FROM_SOURCES.allowLazyDeployment());
+ assertFalse(ScheduleMode.EAGER.allowLazyDeployment());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 15947f9..22f0c60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -18,7 +18,11 @@
package org.apache.flink.runtime.taskmanager;
-import akka.actor.*;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.actor.Status;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.ExecutionConfig;
@@ -31,7 +35,11 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.deployment.*;
+import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionLocation;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -622,7 +630,7 @@ public class TaskManagerTest extends TestLogger {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
@@ -767,7 +775,7 @@ public class TaskManagerTest extends TestLogger {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
@@ -1419,7 +1427,8 @@ public class TaskManagerTest extends TestLogger {
new IntermediateDataSetID(),
new IntermediateResultPartitionID(),
ResultPartitionType.PIPELINED,
- 1);
+ 1,
+ true);
final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
"TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/2742d5c1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 1eebe12..5d26050 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -553,6 +553,9 @@ public class TaskTest extends TestLogger {
}
expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
+ expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
+ expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+ expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
@@ -568,7 +571,7 @@ public class TaskTest extends TestLogger {
assertEquals(expected.get(state), newTaskState);
}
- verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+ verify(inputGate, times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
}
/**
[3/3] flink git commit: [FLINK-5040] [taskmanager] Adjust partition
request backoffs
Posted by uc...@apache.org.
[FLINK-5040] [taskmanager] Adjust partition request backoffs
The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.
This closes #2784.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d5637b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d5637b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d5637b0
Branch: refs/heads/master
Commit: 5d5637b01031746b2dfadf6d7fcd59155f7de653
Parents: 2742d5c
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Nov 11 09:41:39 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/TaskManagerOptions.java | 24 ++++--
.../ResultPartitionDeploymentDescriptor.java | 8 +-
.../partition/consumer/SingleInputGate.java | 10 ++-
.../apache/flink/runtime/taskmanager/Task.java | 2 +-
.../NetworkEnvironmentConfiguration.scala | 14 ++--
.../flink/runtime/taskmanager/TaskManager.scala | 9 ++-
...ResultPartitionDeploymentDescriptorTest.java | 2 +-
.../partition/consumer/SingleInputGateTest.java | 84 ++++++++++++++++++++
...askManagerComponentsStartupShutdownTest.java | 4 +-
.../runtime/taskmanager/TaskManagerTest.java | 5 ++
10 files changed, 140 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index e5d36aa..6f6238b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -35,6 +35,20 @@ public class TaskManagerOptions {
// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
// ------------------------------------------------------------------------
+ // Network Options
+ // ------------------------------------------------------------------------
+
+ /** Minimum backoff for partition requests of input channels. */
+ public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
+ key("taskmanager.net.request-backoff.initial")
+ .defaultValue(100);
+
+ /** Maximum backoff for partition requests of input channels. */
+ public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+ key("taskmanager.net.request-backoff.max")
+ .defaultValue(10000);
+
+ // ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
@@ -44,8 +58,8 @@ public class TaskManagerOptions {
*/
public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
key("task.cancellation.interval")
- .defaultValue(30000L)
- .withDeprecatedKeys("task.cancellation-interval");
+ .defaultValue(30000L)
+ .withDeprecatedKeys("task.cancellation-interval");
/**
* Timeout in milliseconds after which a task cancellation times out and
@@ -54,19 +68,19 @@ public class TaskManagerOptions {
*/
public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
key("task.cancellation.timeout")
- .defaultValue(180000L);
+ .defaultValue(180000L);
/**
* The maximum number of bytes that a checkpoint alignment may buffer.
* If the checkpoint alignment buffers more than the configured amount of
* data, the checkpoint is aborted (skipped).
- *
+ *
* <p>The default value of {@code -1} indicates that there is no limit.
*/
public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
key("task.checkpoint.alignment.max-size")
.defaultValue(-1L);
-
+
// ------------------------------------------------------------------------
/** Not intended to be instantiated */
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..14c7d2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,7 +49,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
private final int numberOfSubpartitions;
/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
- private final boolean lazyScheduling;
+ private final boolean sendScheduleOrUpdateConsumersMessage;
public ResultPartitionDeploymentDescriptor(
IntermediateDataSetID resultId,
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
- this.lazyScheduling = lazyScheduling;
+ this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
}
public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
return numberOfSubpartitions;
}
- public boolean allowLazyScheduling() {
- return lazyScheduling;
+ public boolean sendScheduleOrUpdateConsumersMessage() {
+ return sendScheduleOrUpdateConsumersMessage;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index af5fd89..8f57542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -520,6 +521,13 @@ public class SingleInputGate implements InputGate {
// ------------------------------------------------------------------------
+ @VisibleForTesting
+ Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+ return inputChannels;
+ }
+
+ // ------------------------------------------------------------------------
+
/**
* Creates an input gate and all of its input channels.
*/
@@ -565,7 +573,7 @@ public class SingleInputGate implements InputGate {
partitionLocation.getConnectionId(),
networkEnvironment.getConnectionManager(),
networkEnvironment.getPartitionRequestInitialBackoff(),
- networkEnvironment.getPartitionRequestInitialBackoff(),
+ networkEnvironment.getPartitionRequestMaxBackoff(),
metrics
);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4f3dd54..b960e68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -346,7 +346,7 @@ public class Task implements Runnable, TaskActions {
resultPartitionConsumableNotifier,
ioManager,
networkEnvironment.getDefaultIOMode(),
- desc.allowLazyScheduling());
+ desc.sendScheduleOrUpdateConsumersMessage());
writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 14589a1..6a59665 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -23,10 +23,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
import org.apache.flink.runtime.io.network.netty.NettyConfig
case class NetworkEnvironmentConfiguration(
- numNetworkBuffers: Int,
- networkBufferSize: Int,
- memoryType: MemoryType,
- ioMode: IOMode,
- nettyConfig: Option[NettyConfig] = None,
- partitionRequestInitialBackoff: Int = 500,
- partitinRequestMaxBackoff: Int = 3000)
+ numNetworkBuffers: Int,
+ networkBufferSize: Int,
+ memoryType: MemoryType,
+ ioMode: IOMode,
+ partitionRequestInitialBackoff : Int,
+ partitionRequestMaxBackoff : Int,
+ nettyConfig: Option[NettyConfig] = None)
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4bb2da4..dd5d218 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1982,7 +1982,7 @@ object TaskManager {
kvStateServer,
netConfig.ioMode,
netConfig.partitionRequestInitialBackoff,
- netConfig.partitinRequestMaxBackoff)
+ netConfig.partitionRequestMaxBackoff)
network.start()
@@ -2258,11 +2258,18 @@ object TaskManager {
val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
+ val initialRequestBackoff = configuration.getInteger(
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL)
+ val maxRequestBackoff = configuration.getInteger(
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX)
+
val networkConfig = NetworkEnvironmentConfiguration(
numNetworkBuffers,
pageSize,
memType,
ioMode,
+ initialRequestBackoff,
+ maxRequestBackoff,
nettyConfig)
// ----> timeouts, library caching, profiling
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
assertEquals(partitionId, copy.getPartitionId());
assertEquals(partitionType, copy.getPartitionType());
assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
- assertTrue(copy.allowLazyScheduling());
+ assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 8f9ea9e..0b7b10d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -42,9 +45,12 @@ import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -272,6 +278,84 @@ public class SingleInputGateTest {
}
/**
+ * Tests request back off configuration is correctly forwarded to the channels.
+ */
+ @Test
+ public void testRequestBackoffConfiguration() throws Exception {
+ ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+ new ResultPartitionID(),
+ new ResultPartitionID(),
+ new ResultPartitionID()
+ };
+
+ InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
+ // Local
+ new InputChannelDeploymentDescriptor(
+ partitionIds[0],
+ ResultPartitionLocation.createLocal()),
+ // Remote
+ new InputChannelDeploymentDescriptor(
+ partitionIds[1],
+ ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+ // Unknown
+ new InputChannelDeploymentDescriptor(
+ partitionIds[2],
+ ResultPartitionLocation.createUnknown())};
+
+ InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+
+ int initialBackoff = 137;
+ int maxBackoff = 1001;
+
+ NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+ when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager());
+ when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
+ when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff);
+ when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff);
+ when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
+
+ SingleInputGate gate = SingleInputGate.create(
+ "TestTask",
+ new JobID(),
+ new ExecutionAttemptID(),
+ gateDesc,
+ netEnv,
+ mock(TaskActions.class),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+ Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
+
+ assertEquals(3, channelMap.size());
+ InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
+ assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+ InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
+ assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
+
+ InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
+ assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
+
+ InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
+ for (InputChannel ch : channels) {
+ assertEquals(0, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
+
+ assertTrue(ch.increaseBackoff());
+ assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+ assertFalse(ch.increaseBackoff());
+ }
+ }
+
+ /**
* Returns whether the stack trace represents a Thread in a blocking queue
* poll call.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 0bcd1ce..f9434e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -104,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest {
config);
final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
- 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(), 0, 0);
+ 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, Option.<NettyConfig>empty());
ResourceID taskManagerId = ResourceID.generate();
@@ -121,7 +121,7 @@ public class TaskManagerComponentsStartupShutdownTest {
null,
netConf.ioMode(),
netConf.partitionRequestInitialBackoff(),
- netConf.partitinRequestMaxBackoff());
+ netConf.partitionRequestMaxBackoff());
network.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 22f0c60..fd9ff05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.blob.BlobKey;
@@ -903,6 +904,8 @@ public class TaskManagerTest extends TestLogger {
final int dataPort = NetUtils.getAvailablePort();
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
taskManager = TestingUtils.createTaskManager(
system,
@@ -998,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
jobManager = new AkkaActorGateway(jm, leaderSessionID);
final Configuration config = new Configuration();
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
taskManager = TestingUtils.createTaskManager(
system,