You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/24 16:09:39 UTC
[6/6] flink git commit: [FLINK-6046] Introduce BlobWriter interface
to abstract BlobServer from ExecutionGraph
[FLINK-6046] Introduce BlobWriter interface to abstract BlobServer from ExecutionGraph
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ebe3fb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ebe3fb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ebe3fb5
Branch: refs/heads/master
Commit: 5ebe3fb55f2ce9299660989d40aa6d96d45a6d8c
Parents: 5ff07e6
Author: Till Rohrmann <tr...@apache.org>
Authored: Sat Oct 21 12:31:21 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Oct 24 18:08:31 2017 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobServer.java | 70 +----------
.../apache/flink/runtime/blob/BlobWriter.java | 115 +++++++++++++++++++
.../flink/runtime/blob/VoidBlobWriter.java | 55 +++++++++
.../runtime/executiongraph/ExecutionGraph.java | 27 +++--
.../executiongraph/ExecutionGraphBuilder.java | 6 +-
.../executiongraph/ExecutionJobVertex.java | 8 +-
.../CheckpointSettingsSerializableTest.java | 29 ++---
...ExecutionGraphCheckpointCoordinatorTest.java | 3 +-
.../ExecutionGraphDeploymentTest.java | 17 +--
...ecutionGraphDeploymentWithBlobCacheTest.java | 1 +
...cutionGraphDeploymentWithBlobServerTest.java | 4 +
.../ExecutionGraphSchedulingTest.java | 3 +-
.../executiongraph/ExecutionGraphTestUtils.java | 29 ++---
.../ExecutionVertexLocalityTest.java | 3 +-
.../PipelinedRegionFailoverConcurrencyTest.java | 3 +-
.../RestartPipelinedRegionStrategyTest.java | 9 +-
.../PipelinedFailoverRegionBuildingTest.java | 29 ++---
.../partitioner/RescalePartitionerTest.java | 3 +-
18 files changed, 268 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 0f6b350..01fb808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -24,11 +24,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
* the BLOBs or temporarily cache them.
*/
-public class BlobServer extends Thread implements BlobService, PermanentBlobService, TransientBlobService {
+public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
/** The log object used for debugging. */
private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
@@ -565,41 +563,13 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
return (TransientBlobKey) putInputStream(jobId, inputStream, TRANSIENT_BLOB);
}
- /**
- * Uploads the data of the given byte array for the given job to the BLOB server and makes it
- * a permanent BLOB.
- *
- * @param jobId
- * the ID of the job the BLOB belongs to
- * @param value
- * the buffer to upload
- *
- * @return the computed BLOB key identifying the BLOB on the server
- *
- * @throws IOException
- * thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
- * store
- */
+ @Override
public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
}
- /**
- * Uploads the data from the given input stream for the given job to the BLOB server and makes it
- * a permanent BLOB.
- *
- * @param jobId
- * ID of the job this blob belongs to
- * @param inputStream
- * the input stream to read the data from
- *
- * @return the computed BLOB key identifying the BLOB on the server
- *
- * @throws IOException
- * thrown if an I/O error occurs while reading the data from the input stream, writing it to a
- * local file, or uploading it to the HA store
- */
+ @Override
public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
@@ -886,6 +856,7 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
*
* @return configuration
*/
+ @Override
public final int getMinOffloadingSize() {
return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
}
@@ -943,37 +914,4 @@ public class BlobServer extends Thread implements BlobService, PermanentBlobServ
return new ArrayList<>(activeConnections);
}
}
-
- /**
- * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
- * offloading size of the BlobServer.
- *
- * @param value to serialize
- * @param jobId to which the value belongs.
- * @param blobServer
- * @param <T>
- * @return
- * @throws IOException
- */
- public static <T> Either<SerializedValue<T>, PermanentBlobKey> tryOffload(
- T value,
- JobID jobId,
- @Nullable BlobServer blobServer) throws IOException {
-
- final SerializedValue<T> serializedValue = new SerializedValue<>(value);
-
- if (blobServer == null || serializedValue.getByteArray().length < blobServer.getMinOffloadingSize()) {
- return Either.Left(new SerializedValue<>(value));
- } else {
- try {
- final PermanentBlobKey permanentBlobKey = blobServer.putPermanent(jobId, serializedValue.getByteArray());
-
- return Either.Right(permanentBlobKey);
- } catch (IOException e) {
- LOG.warn("Failed to offload value " + value + " for job " + jobId + " to BLOB store.", e);
-
- return Either.Left(serializedValue);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
new file mode 100644
index 0000000..cc3bd1d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * BlobWriter is used to upload data to the BLOB store.
+ */
+public interface BlobWriter {
+
+ Logger LOG = LoggerFactory.getLogger(BlobWriter.class);
+
+ /**
+ * Uploads the data of the given byte array for the given job to the BLOB server and makes it
+ * a permanent BLOB.
+ *
+ * @param jobId
+ * the ID of the job the BLOB belongs to
+ * @param value
+ * the buffer to upload
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
+ * store
+ */
+ PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;
+
+ /**
+ * Uploads the data from the given input stream for the given job to the BLOB server and makes it
+ * a permanent BLOB.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to
+ * @param inputStream
+ * the input stream to read the data from
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while reading the data from the input stream, writing it to a
+ * local file, or uploading it to the HA store
+ */
+ PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;
+
+ /**
+ * Returns the min size before data will be offloaded to the BLOB store.
+ *
+ * @return minimum offloading size
+ */
+ int getMinOffloadingSize();
+
+ /**
+ * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
+ * offloading size of the BlobServer.
+ *
+ * @param value to serialize
+ * @param jobId to which the value belongs.
+ * @param blobWriter to use to offload the serialized value
+ * @param <T> type of the value to serialize
+ * @return Either the serialized value or the stored blob key
+ * @throws IOException if the data cannot be serialized
+ */
+ static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
+ T value,
+ JobID jobId,
+ BlobWriter blobWriter) throws IOException {
+ Preconditions.checkNotNull(value);
+ Preconditions.checkNotNull(jobId);
+ Preconditions.checkNotNull(blobWriter);
+
+ final SerializedValue<T> serializedValue = new SerializedValue<>(value);
+
+ if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
+ return Either.Left(new SerializedValue<>(value));
+ } else {
+ try {
+ final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());
+
+ return Either.Right(permanentBlobKey);
+ } catch (IOException e) {
+ LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e);
+
+ return Either.Left(serializedValue);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java
new file mode 100644
index 0000000..7830437
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * BlobWriter which does not support writing BLOBs to a store. This class is
+ * mainly used for testing purposes where we don't want to store data in the
+ * BLOB store.
+ */
+@VisibleForTesting
+public class VoidBlobWriter implements BlobWriter {
+
+ private static final VoidBlobWriter INSTANCE = new VoidBlobWriter();
+
+ @Override
+ public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
+ throw new IOException("The VoidBlobWriter cannot write data to the BLOB store.");
+ }
+
+ @Override
+ public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
+ throw new IOException("The VoidBlobWriter cannot write data to the BLOB store.");
+ }
+
+ @Override
+ public int getMinOffloadingSize() {
+ return Integer.MAX_VALUE;
+ }
+
+ public static VoidBlobWriter getInstance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 74a68ab..f002c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -31,8 +31,9 @@ import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -76,8 +77,6 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
@@ -174,6 +173,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Job specific information like the job id, job name, job configuration, etc. */
private final JobInformation jobInformation;
+ /** Serialized job information or a blob key pointing to the offloaded job information */
private final Either<SerializedValue<JobInformation>, PermanentBlobKey> jobInformationOrBlobKey;
/** The executor which is used to execute futures. */
@@ -232,6 +232,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
/** Registered KvState instances reported by the TaskManagers. */
private final KvStateLocationRegistry kvStateLocationRegistry;
+ /** Blob writer used to offload RPC messages */
+ private final BlobWriter blobWriter;
+
/** The total number of vertices currently in the execution graph */
private int numVerticesTotal;
@@ -276,9 +279,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// ------ Fields that are only relevant for archived execution graphs ------------
private String jsonPlan;
- @Nullable
- private BlobServer blobServer;
-
// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
@@ -352,7 +352,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
failoverStrategy,
slotProvider,
ExecutionGraph.class.getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
}
public ExecutionGraph(
@@ -364,13 +364,15 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
FailoverStrategy.Factory failoverStrategyFactory,
SlotProvider slotProvider,
ClassLoader userClassLoader,
- @Nullable BlobServer blobServer) throws IOException {
+ BlobWriter blobWriter) throws IOException {
checkNotNull(futureExecutor);
this.jobInformation = Preconditions.checkNotNull(jobInformation);
- this.jobInformationOrBlobKey = BlobServer.tryOffload(jobInformation, jobInformation.getJobId(), blobServer);
+ this.blobWriter = Preconditions.checkNotNull(blobWriter);
+
+ this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);
this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
this.ioExecutor = Preconditions.checkNotNull(ioExecutor);
@@ -403,8 +405,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
// is ready by the time the failover strategy sees it
this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
-
- this.blobServer = blobServer;
}
// --------------------------------------------------------------------------------------------
@@ -705,9 +705,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return this.stateTimestamps[status.ordinal()];
}
- @Nullable
- public final BlobServer getBlobServer() {
- return blobServer;
+ public final BlobWriter getBlobWriter() {
+ return blobWriter;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 8d48432..2a4315d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -27,7 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
@@ -91,7 +91,7 @@ public class ExecutionGraphBuilder {
RestartStrategy restartStrategy,
MetricGroup metrics,
int parallelismForAutoMax,
- BlobServer blobServer,
+ BlobWriter blobWriter,
Logger log)
throws JobExecutionException, JobException {
@@ -124,7 +124,7 @@ public class ExecutionGraphBuilder {
failoverStrategy,
slotProvider,
classLoader,
- blobServer);
+ blobWriter);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9adaf45..90224b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -32,7 +32,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -368,7 +368,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
// serialize the task information!
synchronized (stateMonitor) {
if (taskInformationOrBlobKey == null) {
- final BlobServer blobServer = graph.getBlobServer();
+ final BlobWriter blobWriter = graph.getBlobWriter();
final TaskInformation taskInformation = new TaskInformation(
jobVertex.getID(),
@@ -378,10 +378,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
jobVertex.getInvokableClassName(),
jobVertex.getConfiguration());
- taskInformationOrBlobKey = BlobServer.tryOffload(
+ taskInformationOrBlobKey = BlobWriter.serializeAndTryOffload(
taskInformation,
getJobId(),
- blobServer);
+ blobWriter);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index e500036..7e85167 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
@@ -95,20 +96,20 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
final ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
- null,
- copy,
- new Configuration(),
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- mock(SlotProvider.class),
- classLoader,
- new StandaloneCheckpointRecoveryFactory(),
- Time.seconds(10),
- new NoRestartStrategy(),
- new UnregisteredMetricsGroup(),
- 10,
- null,
- log);
+ null,
+ copy,
+ new Configuration(),
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ mock(SlotProvider.class),
+ classLoader,
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.seconds(10),
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ 10,
+ VoidBlobWriter.getInstance(),
+ log);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 1489f1a..5893d1d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
@@ -86,7 +87,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
new RestartAllStrategy.Factory(),
new Scheduler(TestingUtils.defaultExecutionContext()),
ClassLoader.getSystemClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
executionGraph.enableCheckpointing(
100,
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index b9ca508..5c80405 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -28,8 +28,9 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -81,9 +82,9 @@ import static org.junit.Assert.fail;
public class ExecutionGraphDeploymentTest extends TestLogger {
/**
- * BLOB server instance to use for the job graph (may be <tt>null</tt>).
+ * BLOB server instance to use for the job graph.
*/
- protected BlobServer blobServer = null;
+ protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
/**
* Permanent BLOB cache instance to use for the actor gateway that handles the {@link
@@ -154,7 +155,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
new RestartAllStrategy.Factory(),
new Scheduler(TestingUtils.defaultExecutionContext()),
ExecutionGraph.class.getClassLoader(),
- blobServer);
+ blobWriter);
checkJobOffloaded(eg);
@@ -168,7 +169,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
ExecutionGraphTestUtils.SimpleActorGatewayWithTDD instanceGateway =
new ExecutionGraphTestUtils.SimpleActorGatewayWithTDD(
TestingUtils.directExecutionContext(),
- blobCache == null ? blobServer : blobCache);
+ blobCache);
final Instance instance = getInstance(new ActorTaskManagerGateway(instanceGateway));
@@ -436,7 +437,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
new RestartAllStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
- blobServer);
+ blobWriter);
checkJobOffloaded(eg);
@@ -517,7 +518,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
new RestartAllStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
- blobServer);
+ blobWriter);
checkJobOffloaded(eg);
eg.setQueuedSchedulingAllowed(false);
@@ -599,7 +600,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
1,
- blobServer,
+ blobWriter,
LoggerFactory.getLogger(getClass()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
index 0fcf8c5..25d218e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobCacheTest.java
@@ -44,6 +44,7 @@ public class ExecutionGraphDeploymentWithBlobCacheTest extends ExecutionGraphDep
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = new BlobServer(config, new VoidBlobStore());
blobServer.start();
+ blobWriter = blobServer;
InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
blobCache = new PermanentBlobCache(serverAddress, config, new VoidBlobStore());
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
index 59d8bc2..232a259 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentWithBlobServerTest.java
@@ -50,12 +50,16 @@ public class ExecutionGraphDeploymentWithBlobServerTest extends ExecutionGraphDe
private Set<byte[]> seenHashes = Collections.newSetFromMap(new ConcurrentHashMap<byte[], Boolean>());
+ protected BlobServer blobServer = null;
+
@Before
public void setupBlobServer() throws IOException {
Configuration config = new Configuration();
// always offload the serialized job and task information
config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
blobServer = Mockito.spy(new BlobServer(config, new VoidBlobStore()));
+ blobWriter = blobServer;
+ blobCache = blobServer;
seenHashes.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index c8cab9f..b90c306 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -555,7 +556,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
1,
- null,
+ VoidBlobWriter.getInstance(),
log);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1f9fa82..017e85f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -330,20 +331,20 @@ public class ExecutionGraphTestUtils {
checkNotNull(vertices);
return ExecutionGraphBuilder.buildGraph(
- null,
- new JobGraph(jid, "test job", vertices),
- new Configuration(),
- executor,
- executor,
- slotProvider,
- ExecutionGraphTestUtils.class.getClassLoader(),
- new StandaloneCheckpointRecoveryFactory(),
- Time.seconds(10),
- restartStrategy,
- new UnregisteredMetricsGroup(),
- 1,
- null,
- TEST_LOGGER);
+ null,
+ new JobGraph(jid, "test job", vertices),
+ new Configuration(),
+ executor,
+ executor,
+ slotProvider,
+ ExecutionGraphTestUtils.class.getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.seconds(10),
+ restartStrategy,
+ new UnregisteredMetricsGroup(),
+ 1,
+ VoidBlobWriter.getInstance(),
+ TEST_LOGGER);
}
public static JobVertex createNoOpVertex(int parallelism) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 2ba4194..5f12646 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -220,7 +221,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
new FixedDelayRestartStrategy(10, 0L),
new UnregisteredMetricsGroup(),
1,
- null,
+ VoidBlobWriter.getInstance(),
log);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index c78e193..ac34c62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
@@ -317,7 +318,7 @@ public class PipelinedRegionFailoverConcurrencyTest {
failoverStrategy,
slotProvider,
getClass().getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
JobVertex jv = new JobVertex("test vertex");
jv.setInvokableClass(NoOpInvokable.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
index 8198df5..059b8a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RestartPipelinedRegionStrategyTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
@@ -102,7 +103,7 @@ public class RestartPipelinedRegionStrategyTest {
new RestartPipelinedRegionStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
try {
eg.attachJobGraph(ordered);
}
@@ -186,7 +187,7 @@ public class RestartPipelinedRegionStrategyTest {
new RestartPipelinedRegionStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
try {
eg.attachJobGraph(ordered);
}
@@ -275,7 +276,7 @@ public class RestartPipelinedRegionStrategyTest {
new RestartPipelinedRegionStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
try {
eg.attachJobGraph(ordered);
}
@@ -355,7 +356,7 @@ public class RestartPipelinedRegionStrategyTest {
new RestartPipelinedRegionStrategy.Factory(),
scheduler,
ExecutionGraph.class.getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
try {
eg.attachJobGraph(ordered);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index 5e96dfd..4709bce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -627,19 +628,19 @@ public class PipelinedFailoverRegionBuildingTest extends TestLogger {
FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME);
return ExecutionGraphBuilder.buildGraph(
- null,
- jobGraph,
- jobManagerConfig,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- mock(SlotProvider.class),
- PipelinedFailoverRegionBuildingTest.class.getClassLoader(),
- new StandaloneCheckpointRecoveryFactory(),
- Time.seconds(10),
- new NoRestartStrategy(),
- new UnregisteredMetricsGroup(),
- 1000,
- null,
- log);
+ null,
+ jobGraph,
+ jobManagerConfig,
+ TestingUtils.defaultExecutor(),
+ TestingUtils.defaultExecutor(),
+ mock(SlotProvider.class),
+ PipelinedFailoverRegionBuildingTest.class.getClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.seconds(10),
+ new NoRestartStrategy(),
+ new UnregisteredMetricsGroup(),
+ 1000,
+ VoidBlobWriter.getInstance(),
+ log);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5ebe3fb5/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index b2e63be..368dfee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.executiongraph.DummyJobInformation;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -154,7 +155,7 @@ public class RescalePartitionerTest extends TestLogger {
new RestartAllStrategy.Factory(),
new Scheduler(TestingUtils.defaultExecutionContext()),
ExecutionGraph.class.getClassLoader(),
- null);
+ VoidBlobWriter.getInstance());
try {
eg.attachJobGraph(jobVertices);
}