You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:38 UTC
[5/6] flink git commit: [FLINK-6046] Either store serialized value or
permanent blob key in ExecutionGraph and ExecutionJobVertex
[FLINK-6046] Either store serialized value or permanent blob key in ExecutionGraph and ExecutionJobVertex
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ff07e63
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ff07e63
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ff07e63
Branch: refs/heads/master
Commit: 5ff07e63d1e9a98959e5edf66872222b847d23d5
Parents: 315badc
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 20 23:39:14 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:31 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/BlobServerOptions.java | 1 +
.../flink/configuration/JobManagerOptions.java | 7 -
.../apache/flink/runtime/blob/BlobServer.java | 39 ++++-
.../runtime/executiongraph/ExecutionGraph.java | 151 +++++++------------
.../executiongraph/ExecutionGraphBuilder.java | 41 ++---
.../executiongraph/ExecutionJobVertex.java | 101 ++++---------
.../runtime/executiongraph/ExecutionVertex.java | 51 ++++---
.../ExecutionGraphDeploymentTest.java | 49 +++---
...ecutionGraphDeploymentWithBlobCacheTest.java | 4 +-
...cutionGraphDeploymentWithBlobServerTest.java | 21 +--
.../executiongraph/FailoverRegionTest.java | 16 +-
.../executiongraph/GlobalModVersionTest.java | 4 +-
.../IndividualRestartsConcurrencyTest.java | 4 +-
.../PipelinedRegionFailoverConcurrencyTest.java | 8 +-
.../RestartPipelinedRegionStrategyTest.java | 32 ++--
.../partitioner/RescalePartitionerTest.java | 9 +-
16 files changed, 253 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index 20b7303..8680096 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index a39927d..ef3306e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -96,13 +96,6 @@ public class JobManagerOptions {
key("jobmanager.archive.fs.dir")
.noDefaultValue();
- /**
- * The maximum size of the <tt>TaskDeploymentDescriptor</tt>'s serialized task and job
- * information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server.
- */
- public static final ConfigOption<Integer> TDD_OFFLOAD_MINSIZE = key("jobmanager.tdd.offload.minsize")
- .defaultValue(1_024); // 1KiB by default
-
// ---------------------------------------------------------------------------------------------
private JobManagerOptions() {
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 7249c8b..0f6b350 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -24,9 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -884,8 +886,8 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
*
* @return configuration
*/
- public final Configuration getConfiguration() {
- return blobServiceConfiguration;
+ public final int getMinOffloadingSize() {
+ return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
}
/**
@@ -941,4 +943,37 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
return new ArrayList<>(activeConnections);
}
}
+
+ /**
+ * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
+ * offloading size of the BlobServer.
+ *
+ * @param value to serialize
+ * @param jobId to which the value belongs.
+ * @param blobServer
+ * @param <T>
+ * @return
+ * @throws IOException
+ */
+ public static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
+ T value,
+ JobID jobId,
+ @Nullable BlobServer blobServer) throws IOException {
+
+ final SerializedValue<T> serializedValue = new SerializedValue<>(value);
+
+ if (blobServer == null || serializedValue.getByteArray().length < blobServer.getMinOffloadingSize()) {
+ return Either.Left(new SerializedValue<>(value));
+ } else {
+ try {
+ final PermanentBlobKey permanentBlobKey = blobServer.putPermanent(jobId, serializedValue.getByteArray());
+
+ return Either.Right(permanentBlobKey);
+ } catch (IOException e) {
+ LOG.warn("Failed to offload value " + value + " for job " + jobId + " to BLOB store.", e);
+
+ return Either.Left(serializedValue);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index fe7770b..74a68ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
@@ -58,17 +57,17 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
@@ -78,6 +77,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
@@ -174,17 +174,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Job specific information like the job id, job name, job configuration, etc. */
private final JobInformation jobInformation;
- /** Serialized version of the job specific information. This is done to avoid multiple
- * serializations of the same data when creating a TaskDeploymentDescriptor.
- */
- private final SerializedValue<JobInformation> serializedJobInformation;
-
- /**
- * The key of the offloaded job information BLOB containing {@link #serializedJobInformation} or
- * <tt>null</tt> if not offloaded.
- */
- @Nullable
- private final PermanentBlobKey jobInformationBlobKey;
+ private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
/** The executor which is used to execute futures. */
private final ScheduledExecutorService futureExecutor;
@@ -245,10 +235,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** The total number of vertices currently in the execution graph */
private int numVerticesTotal;
- /** Blob server reference for offloading large RPC messages. */
- @Nullable
- private final BlobServer blobServer;
-
// ------ Configuration of the Execution -------
/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
@@ -290,6 +276,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ------ Fields that are only relevant for archived execution graphs ------------
private String jsonPlan;
+ @Nullable
+ private BlobServer blobServer;
+
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
@@ -307,10 +296,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
SerializedValue<ExecutionConfig> serializedConfig,
Time timeout,
RestartStrategy restartStrategy,
- SlotProvider slotProvider) {
+ SlotProvider slotProvider) throws IOException {
- this(futureExecutor, ioExecutor, jobId, jobName, jobConfig, serializedConfig, timeout,
- restartStrategy, slotProvider, null);
+ this(
+ new JobInformation(
+ jobId,
+ jobName,
+ serializedConfig,
+ jobConfig,
+ Collections.emptyList(),
+ Collections.emptyList()),
+ futureExecutor,
+ ioExecutor,
+ timeout,
+ restartStrategy,
+ slotProvider);
}
/**
@@ -318,33 +318,41 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
*/
@VisibleForTesting
ExecutionGraph(
+ JobInformation jobInformation,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
- JobID jobId,
- String jobName,
- Configuration jobConfig,
- SerializedValue<ExecutionConfig> serializedConfig,
Time timeout,
RestartStrategy restartStrategy,
- SlotProvider slotProvider,
- @Nullable BlobServer blobServer) {
+ SlotProvider slotProvider) throws IOException {
this(
- new JobInformation(
- jobId,
- jobName,
- serializedConfig,
- jobConfig,
- Collections.emptyList(),
- Collections.emptyList()),
+ jobInformation,
futureExecutor,
ioExecutor,
timeout,
restartStrategy,
new RestartAllStrategy.Factory(),
+ slotProvider);
+ }
+
+ @VisibleForTesting
+ ExecutionGraph(
+ JobInformation jobInformation,
+ ScheduledExecutorService futureExecutor,
+ Executor ioExecutor,
+ Time timeout,
+ RestartStrategy restartStrategy,
+ FailoverStrategy.Factory failoverStrategy,
+ SlotProvider slotProvider) throws IOException {
+ this(
+ jobInformation,
+ futureExecutor,
+ ioExecutor,
+ timeout,
+ restartStrategy,
+ failoverStrategy,
slotProvider,
ExecutionGraph.class.getClassLoader(),
- blobServer
- );
+ null);
}
public ExecutionGraph(
@@ -356,21 +364,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
- @Nullable BlobServer blobServer) {
+ @Nullable BlobServer blobServer) throws IOException {
checkNotNull(futureExecutor);
this.jobInformation = Preconditions.checkNotNull(jobInformation);
- // serialize the job information to do the serialisation work only once
- try {
- this.serializedJobInformation = new SerializedValue<>(jobInformation);
- }
- catch (IOException e) {
- // this cannot happen because 'JobInformation' is perfectly serializable
- // rethrow unchecked, because this indicates a bug, not a recoverable situation
- throw new FlinkRuntimeException("Bug: Cannot serialize JobInformation", e);
- }
+ this.jobInformationOrBlobKey = BlobServer.tryOffload(jobInformation, jobInformation.getJobId(), blobServer);
this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
@@ -405,50 +405,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
this.blobServer = blobServer;
- this.jobInformationBlobKey = tryOffLoadJobInformation();
- }
-
- /**
- * Tries to store {@link #serializedJobInformation} and in the graph's {@link
- * ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
- * it.
- *
- * @return the BLOB key of the uploaded job information or <tt>null</tt> if the upload failed
- */
- @Nullable
- private PermanentBlobKey tryOffLoadJobInformation() {
- if (blobServer == null) {
- return null;
- }
-
- // If the serialized job information inside serializedJobInformation is larger than this,
- // we try to offload it to the BLOB server.
- final int rpcOffloadMinSize =
- blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
-
- if (serializedJobInformation.getByteArray().length > rpcOffloadMinSize) {
- LOG.info("Storing job {} information at the BLOB server", getJobID());
-
- // TODO: do not overwrite existing job info and thus speed up recovery?
- try {
- return blobServer.putPermanent(getJobID(), serializedJobInformation.getByteArray());
- } catch (IOException e) {
- LOG.warn("Failed to offload job " + getJobID() + " information data to BLOB store", e);
- }
- }
-
- return null;
- }
-
- /**
- * Returns the key of the offloaded job information BLOB containing {@link
- * #serializedJobInformation}.
- *
- * @return the BLOB key or <tt>null</tt> if not offloaded
- */
- @Nullable
- public PermanentBlobKey getJobInformationBlobKey() {
- return jobInformationBlobKey;
}
// --------------------------------------------------------------------------------------------
@@ -633,8 +589,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return slotProvider;
}
- public SerializedValue<JobInformation> getSerializedJobInformation() {
- return serializedJobInformation;
+ public Either<SerializedValue<JobInformation>, PermanentBlobKey> getJobInformationOrBlobKey() {
+ return jobInformationOrBlobKey;
}
@Override
@@ -749,6 +705,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return this.stateTimestamps[status.ordinal()];
}
+ @Nullable
public final BlobServer getBlobServer() {
return blobServer;
}
@@ -843,8 +800,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
// create the execution job vertex and attach it to the graph
- ExecutionJobVertex ejv =
- new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
+ ExecutionJobVertex ejv = new ExecutionJobVertex(
+ this,
+ jobVertex,
+ 1,
+ rpcCallTimeout,
+ globalModVersion,
+ createTimestamp);
+
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 42fbfc1..8d48432 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -103,24 +103,31 @@ public class ExecutionGraphBuilder {
final FailoverStrategy.Factory failoverStrategy =
FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
+ final JobInformation jobInformation = new JobInformation(
+ jobId,
+ jobName,
+ jobGraph.getSerializedExecutionConfig(),
+ jobGraph.getJobConfiguration(),
+ jobGraph.getUserJarBlobKeys(),
+ jobGraph.getClasspaths());
+
// create a new execution graph, if none exists so far
- final ExecutionGraph executionGraph = (prior != null) ? prior :
- new ExecutionGraph(
- new JobInformation(
- jobId,
- jobName,
- jobGraph.getSerializedExecutionConfig(),
- jobGraph.getJobConfiguration(),
- jobGraph.getUserJarBlobKeys(),
- jobGraph.getClasspaths()),
- futureExecutor,
- ioExecutor,
- timeout,
- restartStrategy,
- failoverStrategy,
- slotProvider,
- classLoader,
- blobServer);
+ final ExecutionGraph executionGraph;
+ try {
+ executionGraph = (prior != null) ? prior :
+ new ExecutionGraph(
+ jobInformation,
+ futureExecutor,
+ ioExecutor,
+ timeout,
+ restartStrategy,
+ failoverStrategy,
+ slotProvider,
+ classLoader,
+ blobServer);
+ } catch (IOException e) {
+ throw new JobException("Could not create the ExecutionGraph.", e);
+ }
// set the basic properties
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 33a4359..9adaf45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -46,8 +46,10 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.types.Either;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+
import org.slf4j.Logger;
import javax.annotation.Nullable;
@@ -131,6 +133,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
@Nullable
private PermanentBlobKey taskInformationBlobKey = null;
+ private Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = null;
+
private InputSplitAssigner splitAssigner;
/**
@@ -147,12 +151,12 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
public ExecutionJobVertex(
- ExecutionGraph graph,
- JobVertex jobVertex,
- int defaultParallelism,
- Time timeout,
- long initialGlobalModVersion,
- long createTimestamp) throws JobException {
+ ExecutionGraph graph,
+ JobVertex jobVertex,
+ int defaultParallelism,
+ Time timeout,
+ long initialGlobalModVersion,
+ long createTimestamp) throws JobException {
if (graph == null || jobVertex == null) {
throw new NullPointerException();
@@ -359,80 +363,29 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return inputs;
}
- public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException {
-
+ public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
// only one thread should offload the task information, so let's also let only one thread
// serialize the task information!
synchronized (stateMonitor) {
- if (null == serializedTaskInformation) {
-
- int parallelism = getParallelism();
- int maxParallelism = getMaxParallelism();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating task information for " + generateDebugString());
- }
-
- serializedTaskInformation = new SerializedValue<>(
- new TaskInformation(
- jobVertex.getID(),
- jobVertex.getName(),
- parallelism,
- maxParallelism,
- jobVertex.getInvokableClassName(),
- jobVertex.getConfiguration()));
-
- taskInformationBlobKey = tryOffLoadTaskInformation();
- }
- }
-
- return serializedTaskInformation;
- }
-
- /**
- * Returns the key of the offloaded task information BLOB containing {@link
- * #serializedTaskInformation}.
- * <p>
- * This may be true after the first call to {@link #getSerializedTaskInformation()}.
- *
- * @return the BLOB key or <tt>null</tt> if not offloaded
- */
- @Nullable
- public PermanentBlobKey getTaskInformationBlobKey() {
- return taskInformationBlobKey;
- }
-
- /**
- * Tries to store {@link #serializedTaskInformation} and in the graph's {@link
- * ExecutionGraph#blobServer} (if not <tt>null</tt>) so that RPC messages do not need to include
- * it.
- *
- * @return the BLOB key of the uploaded task information or <tt>null</tt> if the upload failed
- */
- @Nullable
- private PermanentBlobKey tryOffLoadTaskInformation() {
- BlobServer blobServer = graph.getBlobServer();
- if (blobServer == null) {
- return null;
- }
-
- // If the serialized task information inside #serializedTaskInformation is larger than this,
- // we try to offload it to the BLOB server.
- final int rpcOffloadMinSize =
- blobServer.getConfiguration().getInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE);
-
- if (serializedTaskInformation.getByteArray().length > rpcOffloadMinSize) {
- LOG.info("Storing task {} information at the BLOB server", getJobVertexId());
-
- // TODO: do not overwrite existing task info and thus speed up recovery?
- try {
- return blobServer.putPermanent(getJobId(), serializedTaskInformation.getByteArray());
- } catch (IOException e) {
- LOG.warn("Failed to offload task " + getJobVertexId() + " information data to BLOB store", e);
+ if (taskInformationOrBlobKey == null) {
+ final BlobServer blobServer = graph.getBlobServer();
+
+ final TaskInformation taskInformation = new TaskInformation(
+ jobVertex.getID(),
+ jobVertex.getName(),
+ parallelism,
+ maxParallelism,
+ jobVertex.getInvokableClassName(),
+ jobVertex.getConfiguration());
+
+ taskInformationOrBlobKey = BlobServer.tryOffload(
+ taskInformation,
+ getJobId(),
+ blobServer);
}
}
- return null;
+ return taskInformationOrBlobKey;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 17ad3c8..6b9d481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -46,8 +46,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -770,33 +772,32 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
}
- TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
- {
- PermanentBlobKey jobInfoBlobKey = getExecutionGraph().getJobInformationBlobKey();
- if (jobInfoBlobKey != null) {
- serializedJobInformation =
- new TaskDeploymentDescriptor.Offloaded<>(jobInfoBlobKey);
- } else {
- serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
- getExecutionGraph().getSerializedJobInformation());
- }
+ final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = getExecutionGraph().getJobInformationOrBlobKey();
+
+ final TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> serializedJobInformation;
+
+ if (jobInformationOrBlobKey.isLeft()) {
+ serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
+ } else {
+ serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
}
- TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
- {
- PermanentBlobKey taskInfoBlobKey = jobVertex.getTaskInformationBlobKey();
- if (taskInfoBlobKey != null) {
- serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInfoBlobKey);
- } else {
- try {
- serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(
- jobVertex.getSerializedTaskInformation());
- } catch (IOException e) {
- throw new ExecutionGraphException(
- "Could not create a serialized JobVertexInformation for " +
- jobVertex.getJobVertexId(), e);
- }
- }
+ final Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey;
+
+ try {
+ taskInformationOrBlobKey = jobVertex.getTaskInformationOrBlobKey();
+ } catch (IOException e) {
+ throw new ExecutionGraphException(
+ "Could not create a serialized JobVertexInformation for " +
+ jobVertex.getJobVertexId(), e);
+ }
+
+ final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation;
+
+ if (taskInformationOrBlobKey.isLeft()) {
+ serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left());
+ } else {
+ serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right());
}
return new TaskDeploymentDescriptor(
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 3af4171..b9ca508 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.executiongraph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
@@ -36,6 +35,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -54,7 +54,7 @@ import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
import org.junit.Test;
import org.slf4j.LoggerFactory;
@@ -68,7 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
-import static junit.framework.TestCase.assertNull;
+import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -78,7 +78,7 @@ import static org.junit.Assert.fail;
/**
* Tests for {@link ExecutionGraph} deployment.
*/
-public class ExecutionGraphDeploymentTest {
+public class ExecutionGraphDeploymentTest extends TestLogger {
/**
* BLOB server instance to use for the job graph (may be <tt>null</tt>).
@@ -98,7 +98,7 @@ public class ExecutionGraphDeploymentTest {
* @param eg the execution graph that was created
*/
protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
- assertNull(eg.getJobInformationBlobKey());
+ assertTrue(eg.getJobInformationOrBlobKey().isLeft());
}
/**
@@ -109,7 +109,7 @@ public class ExecutionGraphDeploymentTest {
* @param jobVertexId job vertex ID
*/
protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
- assertNull(eg.getJobVertex(jobVertexId).getTaskInformationBlobKey());
+ assertTrue(eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft());
}
@Test
@@ -141,17 +141,21 @@ public class ExecutionGraphDeploymentTest {
v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+ final JobInformation expectedJobInformation = new DummyJobInformation(
+ jobId,
+ "some job");
+
ExecutionGraph eg = new ExecutionGraph(
+ expectedJobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
- jobId,
- "some job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
+ new RestartAllStrategy.Factory(),
new Scheduler(TestingUtils.defaultExecutionContext()),
+ ExecutionGraph.class.getClassLoader(),
blobServer);
+
checkJobOffloaded(eg);
List<JobVertex> ordered = Arrays.asList(v1, v2, v3, v4);
@@ -387,12 +391,12 @@ public class ExecutionGraphDeploymentTest {
}
}
- @Test
/**
* Tests that a blocking batch job fails if there are not enough resources left to schedule the
* succeeding tasks. This test case is related to [FLINK-4296] where finished producing tasks
* swallow the fail exception when scheduling a consumer task.
*/
+ @Test
public void testNoResourceAvailableFailure() throws Exception {
final JobID jobId = new JobID();
JobVertex v1 = new JobVertex("source");
@@ -418,18 +422,22 @@ public class ExecutionGraphDeploymentTest {
TestingUtils.directExecutionContext()))));
}
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ "failing test job");
+
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
+ jobInformation,
new DirectScheduledExecutorService(),
TestingUtils.defaultExecutor(),
- jobId,
- "failing test job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
+ new RestartAllStrategy.Factory(),
scheduler,
+ ExecutionGraph.class.getClassLoader(),
blobServer);
+
checkJobOffloaded(eg);
eg.setQueuedSchedulingAllowed(false);
@@ -495,17 +503,20 @@ public class ExecutionGraphDeploymentTest {
TestingUtils.directExecutionContext()))));
}
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ "some job");
+
// execution graph that executes actions synchronously
ExecutionGraph eg = new ExecutionGraph(
+ jobInformation,
new DirectScheduledExecutorService(),
TestingUtils.defaultExecutor(),
- jobId,
- "some job",
- new Configuration(),
- new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
new NoRestartStrategy(),
+ new RestartAllStrategy.Factory(),
scheduler,
+ ExecutionGraph.class.getClassLoader(),
blobServer);
checkJobOffloaded(eg);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
index f1c071e..0fcf8c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -41,7 +41,7 @@ public class ExecutionGraphDeploymentWithBlobCacheTest extends ExecutionGraphDep
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
- config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
+ config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = new BlobServer(config, new VoidBlobStore());
blobServer.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
index 030e18d..59d8bc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
@@ -19,12 +19,14 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedValue;
import org.junit.After;
import org.junit.Before;
@@ -36,7 +38,6 @@ import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -53,7 +54,7 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
- config.setInteger(JobManagerOptions.TDD_OFFLOAD_MINSIZE, 0);
+ config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = Mockito.spy(new BlobServer(config, new VoidBlobStore()));
seenHashes.clear();
@@ -81,19 +82,21 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
@Override
protected void checkJobOffloaded(ExecutionGraph eg) throws Exception {
- PermanentBlobKey jobInformationBlobKey = eg.getJobInformationBlobKey();
- assertNotNull(jobInformationBlobKey);
+ Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey = eg.getJobInformationOrBlobKey();
+
+ assertTrue(jobInformationOrBlobKey.isRight());
// must not throw:
- blobServer.getFile(eg.getJobID(), jobInformationBlobKey);
+ blobServer.getFile(eg.getJobID(), jobInformationOrBlobKey.right());
}
@Override
protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
- PermanentBlobKey taskInformationBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationBlobKey();
- assertNotNull(taskInformationBlobKey);
+ Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey = eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey();
+
+ assertTrue(taskInformationOrBlobKey.isRight());
// must not throw:
- blobServer.getFile(eg.getJobID(), taskInformationBlobKey);
+ blobServer.getFile(eg.getJobID(), taskInformationOrBlobKey.right());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 1f20e12..4e89d43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -133,9 +133,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new FailoverPipelinedRegionWithDirectExecutor(),
- slotProvider,
- ExecutionGraph.class.getClassLoader(),
- null);
+ slotProvider);
eg.attachJobGraph(ordered);
@@ -257,9 +255,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new RestartPipelinedRegionStrategy.Factory(),
- scheduler,
- ExecutionGraph.class.getClassLoader(),
- null);
+ scheduler);
try {
eg.attachJobGraph(ordered);
}
@@ -331,9 +327,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
new InfiniteDelayRestartStrategy(10),
new FailoverPipelinedRegionWithDirectExecutor(),
- scheduler,
- ExecutionGraph.class.getClassLoader(),
- null);
+ scheduler);
try {
eg.attachJobGraph(ordered);
}
@@ -441,9 +435,7 @@ public class FailoverRegionTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
restartStrategy,
new FailoverPipelinedRegionWithDirectExecutor(),
- scheduler,
- ExecutionGraph.class.getClassLoader(),
- null);
+ scheduler);
try {
eg.attachJobGraph(ordered);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index b1d6692..d8f0309 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -168,9 +168,7 @@ public class GlobalModVersionTest {
Time.seconds(10),
new InfiniteDelayRestartStrategy(),
new CustomStrategy(failoverStrategy),
- slotProvider,
- getClass().getClassLoader(),
- null);
+ slotProvider);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index 9d924c9..33456f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -291,9 +291,7 @@ public class IndividualRestartsConcurrencyTest {
Time.seconds(10),
restartStrategy,
failoverStrategy,
- slotProvider,
- getClass().getClassLoader(),
- null);
+ slotProvider);
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index 93c163b..c78e193 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -303,11 +303,13 @@ public class PipelinedRegionFailoverConcurrencyTest {
SlotProvider slotProvider,
int parallelism) throws Exception {
+ final JobInformation jobInformation = new DummyJobInformation(
+ jid,
+ "test job");
+
// build a simple execution graph with on job vertex, parallelism 2
final ExecutionGraph graph = new ExecutionGraph(
- new DummyJobInformation(
- jid,
- "test job"),
+ jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
Time.seconds(10),
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index edb39e9..8198df5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -89,10 +89,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ jobName);
+
ExecutionGraph eg = new ExecutionGraph(
- new DummyJobInformation(
- jobId,
- jobName),
+ jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
@@ -171,10 +173,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ jobName);
+
ExecutionGraph eg = new ExecutionGraph(
- new DummyJobInformation(
- jobId,
- jobName),
+ jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
@@ -258,10 +262,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ jobName);
+
ExecutionGraph eg = new ExecutionGraph(
- new DummyJobInformation(
- jobId,
- jobName),
+ jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
@@ -336,10 +342,12 @@ public class RestartPipelinedRegionStrategyTest {
List<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4));
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutor());
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ jobName);
+
ExecutionGraph eg = new ExecutionGraph(
- new DummyJobInformation(
- jobId,
- jobName),
+ jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),
http://git-wip-us.apache.org/repos/asf/flink/blob/5ff07e63/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index e72ddf7..b2e63be 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -140,10 +141,12 @@ public class RescalePartitionerTest extends TestLogger {
assertEquals(4, mapVertex.getParallelism());
assertEquals(2, sinkVertex.getParallelism());
+ final JobInformation jobInformation = new DummyJobInformation(
+ jobId,
+ jobName);
+
ExecutionGraph eg = new ExecutionGraph(
- new DummyJobInformation(
- jobId,
- jobName),
+ jobInformation,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
AkkaUtils.getDefaultTimeout(),