You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/03 19:09:06 UTC
[1/5] flink git commit: [FLINK-8785][rest] Handle
JobSubmissionExceptions
Repository: flink
Updated Branches:
refs/heads/master c6ad421e2 -> a25cd3fed
[FLINK-8785][rest] Handle JobSubmissionExceptions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81d13557
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81d13557
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81d13557
Branch: refs/heads/master
Commit: 81d135578c026842e6d8bc95391da60886612166
Parents: c6ad421
Author: zentol <ch...@apache.org>
Authored: Thu Jun 28 10:57:01 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 21:08:44 2018 +0200
----------------------------------------------------------------------
.../rest/handler/job/JobSubmitHandler.java | 6 +++-
.../rest/handler/job/JobSubmitHandlerTest.java | 30 ++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/81d13557/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index ac0a17b..af04629 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -38,6 +38,7 @@ import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
/**
* This handler can be used to submit jobs to a Flink cluster.
@@ -66,6 +67,9 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
}
return gateway.submitJob(jobGraph, timeout)
- .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
+ .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
+ .exceptionally(exception -> {
+ throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
+ });
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/81d13557/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 212af5f..9c97ad4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -28,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -87,4 +89,32 @@ public class JobSubmitHandlerTest extends TestLogger {
handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
.get();
}
+
+ @Test
+ public void testFailedJobSubmission() throws Exception {
+ final String errorMessage = "test";
+ DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+ when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new Exception(errorMessage)));
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+ CompletableFuture.completedFuture("http://localhost:1234"),
+ () -> CompletableFuture.completedFuture(mockGateway),
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap());
+
+ JobGraph job = new JobGraph("testjob");
+ JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+ try {
+ handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+ .get();
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (t instanceof RestHandlerException){
+ Assert.assertTrue(t.getMessage().equals("Job submission failed."));
+ } else {
+ throw e;
+ }
+ }
+ }
}
[5/5] flink git commit: [hotfix][tests] Use utility functions for
setting config
Posted by ch...@apache.org.
[hotfix][tests] Use utility functions for setting config
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544bfb94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544bfb94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544bfb94
Branch: refs/heads/master
Commit: 544bfb945d88734ed7d428a7831e18c924003767
Parents: 885640f
Author: zentol <ch...@apache.org>
Authored: Tue Jul 3 12:11:44 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 21:08:45 2018 +0200
----------------------------------------------------------------------
.../test_high_parallelism_iterations.sh | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/544bfb94/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
index aa937ae..5a06266 100755
--- a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
+++ b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
@@ -27,18 +27,17 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
backup_config
+set_conf "taskmanager.heap.mb" "52" # 52Mb x 100 TMs = 5Gb total heap
-echo "taskmanager.heap.mb: 52" >> $FLINK_DIR/conf/flink-conf.yaml # 52Mb x 100 TMs = 5Gb total heap
+set_conf "taskmanager.memory.size" "8" # 8Mb
+set_conf "taskmanager.network.memory.min" "8388608" # 8Mb
+set_conf "taskmanager.network.memory.max" "8388608" # 8Mb
+set_conf "taskmanager.memory.segment-size" "8192" # 8Kb
-echo "taskmanager.memory.size: 8" >> $FLINK_DIR/conf/flink-conf.yaml # 8Mb
-echo "taskmanager.network.memory.min: 8388608" >> $FLINK_DIR/conf/flink-conf.yaml # 8Mb
-echo "taskmanager.network.memory.max: 8388608" >> $FLINK_DIR/conf/flink-conf.yaml # 8Mb
-echo "taskmanager.memory.segment-size: 8192" >> $FLINK_DIR/conf/flink-conf.yaml # 8Kb
+set_conf "taskmanager.network.netty.server.numThreads" "1"
+set_conf "taskmanager.network.netty.client.numThreads" "1"
-echo "taskmanager.network.netty.server.numThreads: 1" >> $FLINK_DIR/conf/flink-conf.yaml
-echo "taskmanager.network.netty.client.numThreads: 1" >> $FLINK_DIR/conf/flink-conf.yaml
-
-echo "taskmanager.numberOfTaskSlots: 1" >> $FLINK_DIR/conf/flink-conf.yaml
+set_conf "taskmanager.numberOfTaskSlots" "1"
print_mem_use
start_cluster
[4/5] flink git commit: [FLINK-9280][rest] Rework JobSubmitHandler to
accept jar/artifact files
Posted by ch...@apache.org.
[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files
This closes #6203.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a25cd3fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a25cd3fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a25cd3fe
Branch: refs/heads/master
Commit: a25cd3feddd19e75456db32a704ee5509e85dd47
Parents: 544bfb9
Author: zentol <ch...@apache.org>
Authored: Mon Jun 11 11:45:12 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 21:08:45 2018 +0200
----------------------------------------------------------------------
docs/_includes/generated/rest_dispatcher.html | 249 +++++++++----------
.../client/program/rest/RestClusterClient.java | 122 ++++-----
.../program/rest/RestClusterClientTest.java | 23 --
.../webmonitor/handlers/JarRunHandler.java | 2 +-
.../webmonitor/handlers/JarUploadHandler.java | 5 +-
.../handlers/JarUploadHandlerTest.java | 2 +-
.../flink/runtime/client/ClientUtils.java | 57 +++--
.../apache/flink/runtime/client/JobClient.java | 2 +-
.../client/JobSubmissionClientActor.java | 2 +-
.../dispatcher/DispatcherRestEndpoint.java | 11 +-
.../flink/runtime/minicluster/MiniCluster.java | 2 +-
.../flink/runtime/rest/AbstractHandler.java | 140 +++++------
.../rest/handler/AbstractRestHandler.java | 6 +-
.../flink/runtime/rest/handler/FileUploads.java | 9 +-
.../runtime/rest/handler/HandlerRequest.java | 8 +-
.../rest/handler/job/BlobServerPortHandler.java | 66 -----
.../rest/handler/job/JobSubmitHandler.java | 136 +++++++++-
.../AbstractTaskManagerFileHandler.java | 4 +-
.../rest/messages/BlobServerPortHeaders.java | 74 ------
.../messages/BlobServerPortResponseBody.java | 57 -----
.../rest/messages/job/JobSubmitHeaders.java | 11 +-
.../rest/messages/job/JobSubmitRequestBody.java | 115 ++++++---
.../flink/runtime/rest/util/RestConstants.java | 6 +-
.../flink/runtime/client/ClientUtilsTest.java | 4 +-
.../flink/runtime/rest/AbstractHandlerTest.java | 19 +-
.../runtime/rest/MultipartUploadResource.java | 7 +-
.../runtime/rest/RestServerEndpointITCase.java | 3 +-
.../runtime/rest/handler/FileUploadsTest.java | 6 +-
.../handler/job/BlobServerPortHandlerTest.java | 101 --------
.../rest/handler/job/JobSubmitHandlerTest.java | 188 ++++++++++++--
.../messages/BlobServerPortResponseTest.java | 35 ---
.../rest/messages/JobSubmitRequestBodyTest.java | 9 +-
.../webmonitor/TestingDispatcherGateway.java | 203 +++++++++++++++
.../webmonitor/TestingRestfulGateway.java | 30 +--
34 files changed, 951 insertions(+), 763 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index c74da9c..6ed59be 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -1,50 +1,6 @@
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/blobserver/port</strong></td>
- </tr>
- <tr>
- <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
- <td class="text-left">Response code: <code>200 OK</code></td>
- </tr>
- <tr>
- <td colspan="2">Returns the port of blob server which can be used to upload jars.</td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#607508253">Request</button>
- <div id="607508253" class="collapse">
- <pre>
- <code>
-{} </code>
- </pre>
- </div>
- </td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#1913718109">Response</button>
- <div id="1913718109" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
- "properties" : {
- "port" : {
- "type" : "integer"
- }
- }
-} </code>
- </pre>
- </div>
- </td>
- </tr>
- </tbody>
-</table>
-<table class="table table-bordered">
- <tbody>
- <tr>
<td class="text-left" colspan="2"><strong>/cluster</strong></td>
</tr>
<tr>
@@ -226,19 +182,11 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
</tr>
<tr>
<td colspan="2">
- <button data-toggle="collapse" data-target="#919877128">Request</button>
- <div id="919877128" class="collapse">
+ <button data-toggle="collapse" data-target="#-1290030289">Request</button>
+ <div id="-1290030289" class="collapse">
<pre>
<code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:FileUpload",
- "properties" : {
- "path" : {
- "type" : "string"
- }
- }
-} </code>
+{} </code>
</pre>
</div>
</td>
@@ -607,7 +555,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
<td class="text-left">Response code: <code>202 Accepted</code></td>
</tr>
<tr>
- <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client.</td>
+ <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client. This call expects amultipart/form-data request that consists of file uploads for the serialized JobGraph, jars anddistributed cache artifacts and an attribute named "request"for the JSON payload.</td>
</tr>
<tr>
<td colspan="2">
@@ -619,10 +567,28 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
"properties" : {
- "serializedJobGraph" : {
+ "jobGraphFileName" : {
+ "type" : "string"
+ },
+ "jobJarFileNames" : {
"type" : "array",
"items" : {
- "type" : "integer"
+ "type" : "string"
+ }
+ },
+ "jobArtifactFileNames" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody:DistributedCacheFile",
+ "properties" : {
+ "entryName" : {
+ "type" : "string"
+ },
+ "fileName" : {
+ "type" : "string"
+ }
+ }
}
}
}
@@ -2461,79 +2427,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
<table class="table table-bordered">
<tbody>
<tr>
- <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
- </tr>
- <tr>
- <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
- <td class="text-left">Response code: <code>200 OK</code></td>
- </tr>
- <tr>
- <td colspan="2">Returns user-defined accumulators of a task, aggregated across all subtasks.</td>
- </tr>
- <tr>
- <td colspan="2">Path parameters</td>
- </tr>
- <tr>
- <td colspan="2">
- <ul>
-<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
- </ul>
- </td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#485581006">Request</button>
- <div id="485581006" class="collapse">
- <pre>
- <code>
-{} </code>
- </pre>
- </div>
- </td>
- </tr>
- <tr>
- <td colspan="2">
- <button data-toggle="collapse" data-target="#-1070353054">Response</button>
- <div id="-1070353054" class="collapse">
- <pre>
- <code>
-{
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo",
- "properties" : {
- "id" : {
- "type" : "string"
- },
- "user-accumulators" : {
- "type" : "array",
- "items" : {
- "type" : "object",
- "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
- "properties" : {
- "name" : {
- "type" : "string"
- },
- "type" : {
- "type" : "string"
- },
- "value" : {
- "type" : "string"
- }
- }
- }
- }
- }
-} </code>
- </pre>
- </div>
- </td>
- </tr>
- </tbody>
-</table>
-<table class="table table-bordered">
- <tbody>
- <tr>
<td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
</tr>
<tr>
@@ -2675,6 +2568,100 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa
<table class="table table-bordered">
<tbody>
<tr>
+ <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/accumulators</strong></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Returns all user-defined accumulators for all subtasks of a task.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#-886388859">Request</button>
+ <div id="-886388859" class="collapse">
+ <pre>
+ <code>
+{} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <button data-toggle="collapse" data-target="#112317594">Response</button>
+ <div id="112317594" class="collapse">
+ <pre>
+ <code>
+{
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo",
+ "properties" : {
+ "id" : {
+ "type" : "any"
+ },
+ "parallelism" : {
+ "type" : "integer"
+ },
+ "subtasks" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo",
+ "properties" : {
+ "subtask" : {
+ "type" : "integer"
+ },
+ "attempt" : {
+ "type" : "integer"
+ },
+ "host" : {
+ "type" : "string"
+ },
+ "user-accumulators" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
+ "properties" : {
+ "name" : {
+ "type" : "string"
+ },
+ "type" : {
+ "type" : "string"
+ },
+ "value" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+} </code>
+ </pre>
+ </div>
+ </td>
+ </tr>
+ </tbody>
+</table>
+<table class="table table-bordered">
+ <tbody>
+ <tr>
<td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
</tr>
<tr>
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 85699d7..935a07f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -22,15 +22,16 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.NewClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.client.ClientUtils;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -49,8 +51,6 @@ import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeader
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -87,10 +87,10 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ScalaUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
@@ -102,20 +102,20 @@ import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import akka.actor.AddressFromURIString;
-
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.ObjectOutputStream;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -315,36 +315,61 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
// we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
- log.info("Requesting blob server port.");
- CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance());
+ CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
+ try {
+ final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
+ }
+ return jobGraphFile;
+ } catch (IOException e) {
+ throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));
+ }
+ }, executorService);
- CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
- getDispatcherAddress(),
- (BlobServerPortResponseBody response, String dispatcherAddress) -> {
- final int blobServerPort = response.port;
- final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
+ CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
+ List<String> jarFileNames = new ArrayList<>(8);
+ List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
+ Collection<FileUpload> filesToUpload = new ArrayList<>(8);
- try {
- ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig));
- } catch (Exception e) {
- throw new CompletionException(e);
- }
+ filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
- return jobGraph;
- });
+ for (Path jar : jobGraph.getUserJars()) {
+ jarFileNames.add(jar.getName());
+ filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+ }
- CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose(
- (JobGraph jobGraphToSubmit) -> {
- log.info("Submitting job graph.");
+ for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
+ artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName()));
+ filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
+ }
- try {
- return sendRequest(
- JobSubmitHeaders.getInstance(),
- new JobSubmitRequestBody(jobGraph));
- } catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe));
- }
- });
+ final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
+ jobGraphFile.getFileName().toString(),
+ jarFileNames,
+ artifactFileNames);
+
+ return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));
+ });
+
+ final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(
+ requestAndFileUploads -> sendRetriableRequest(
+ JobSubmitHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ requestAndFileUploads.f0,
+ requestAndFileUploads.f1,
+ isConnectionProblemOrServiceUnavailable())
+ );
+
+ submissionFuture
+ .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile)
+ .thenAccept(jobGraphFile -> {
+ try {
+ Files.delete(jobGraphFile);
+ } catch (IOException e) {
+ log.warn("Could not delete temporary file {}.", jobGraphFile, e);
+ }
+ });
return submissionFuture
.thenApply(
@@ -676,9 +701,14 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) {
+ return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate);
+ }
+
+ private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P>
+ sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
try {
- return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+ return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload);
} catch (IOException e) {
throw new CompletionException(e);
}
@@ -736,26 +766,4 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
}
}, executorService);
}
-
- private CompletableFuture<String> getDispatcherAddress() {
- return FutureUtils.orTimeout(
- dispatcherLeaderRetriever.getLeaderFuture(),
- restClusterClientConfiguration.getAwaitLeaderTimeout(),
- TimeUnit.MILLISECONDS)
- .thenApplyAsync(leaderAddressSessionId -> {
- final String address = leaderAddressSessionId.f0;
- final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
-
- return host.orElseGet(() -> {
- // if the dispatcher address does not contain a host part, then assume it's running
- // on the same machine as the client
- log.info("The dispatcher seems to run without remoting enabled. This indicates that we are " +
- "in a test. This can only work if the RestClusterClient runs on the same machine. " +
- "Assuming, therefore, 'localhost' as the host.");
-
- return "localhost";
- });
- }, executorService);
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f025d67..75f16c0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -53,8 +53,6 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -226,7 +224,6 @@ public class RestClusterClientTest extends TestLogger {
@Test
public void testJobSubmitCancelStop() throws Exception {
- TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler();
TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
TestJobExecutionResultHandler testJobExecutionResultHandler =
@@ -237,15 +234,12 @@ public class RestClusterClientTest extends TestLogger {
.build()));
try (TestRestServerEndpoint ignored = createRestServerEndpoint(
- portHandler,
submitHandler,
terminationHandler,
testJobExecutionResultHandler)) {
- Assert.assertFalse(portHandler.portRetrieved);
Assert.assertFalse(submitHandler.jobSubmitted);
restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
- Assert.assertTrue(portHandler.portRetrieved);
Assert.assertTrue(submitHandler.jobSubmitted);
Assert.assertFalse(terminationHandler.jobCanceled);
@@ -264,11 +258,9 @@ public class RestClusterClientTest extends TestLogger {
@Test
public void testDetachedJobSubmission() throws Exception {
- final TestBlobServerPortHandler testBlobServerPortHandler = new TestBlobServerPortHandler();
final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler();
try (TestRestServerEndpoint ignored = createRestServerEndpoint(
- testBlobServerPortHandler,
testJobSubmitHandler)) {
restClusterClient.setDetached(true);
@@ -282,20 +274,6 @@ public class RestClusterClientTest extends TestLogger {
}
- private class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
- private volatile boolean portRetrieved = false;
-
- private TestBlobServerPortHandler() {
- super(BlobServerPortHeaders.getInstance());
- }
-
- @Override
- protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- portRetrieved = true;
- return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000));
- }
- }
-
private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
private volatile boolean jobSubmitted = false;
@@ -390,7 +368,6 @@ public class RestClusterClientTest extends TestLogger {
try (TestRestServerEndpoint ignored = createRestServerEndpoint(
testJobExecutionResultHandler,
- new TestBlobServerPortHandler(),
new TestJobSubmitHandler())) {
JobExecutionResult jobExecutionResult;
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 10387c8..1e620d4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -114,7 +114,7 @@ public class JarRunHandler extends
CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
try {
- ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
+ ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration));
} catch (FlinkException e) {
throw new CompletionException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index a1ef82b..83db224 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -32,6 +32,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import javax.annotation.Nonnull;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -70,11 +71,11 @@ public class JarUploadHandler extends
protected CompletableFuture<JarUploadResponseBody> handleRequest(
@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
@Nonnull final RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Collection<File> uploadedFiles = request.getUploadedFiles();
if (uploadedFiles.size() != 1) {
throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
}
- final Path fileUpload = uploadedFiles.iterator().next();
+ final Path fileUpload = uploadedFiles.iterator().next().toPath();
return CompletableFuture.supplyAsync(() -> {
if (!fileUpload.getFileName().toString().endsWith(".jar")) {
throw new CompletionException(new RestHandlerException(
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index 812d4c6..c9e25ed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -131,6 +131,6 @@ public class JarUploadHandlerTest extends TestLogger {
EmptyMessageParameters.getInstance(),
Collections.emptyMap(),
Collections.emptyMap(),
- Collections.singleton(uploadedFile));
+ Collections.singleton(uploadedFile.toFile()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
index fc6a621..06baaaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.client;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
@@ -32,7 +31,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -43,20 +41,41 @@ public enum ClientUtils {
;
/**
- * Uploads all files required for the execution of the given {@link JobGraph} using the {@link BlobClient} from
- * the given {@link Supplier}.
+ * Extracts all files required for the execution from the given {@link JobGraph} and uploads them using the {@link BlobClient}
+ * from the given {@link Supplier}.
*
* @param jobGraph jobgraph requiring files
* @param clientSupplier supplier of blob client to upload files with
- * @throws IOException if the upload fails
+ * @throws FlinkException if the upload fails
*/
- public static void uploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
+ public static void extractAndUploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
List<Path> userJars = jobGraph.getUserJars();
- Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts();
+ Collection<Tuple2<String, Path>> userArtifacts = jobGraph.getUserArtifacts().entrySet().stream()
+ .map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath)))
+ .collect(Collectors.toList());
+
+ uploadJobGraphFiles(jobGraph, userJars, userArtifacts, clientSupplier);
+ }
+
+ /**
+ * Uploads the given jars and artifacts required for the execution of the given {@link JobGraph} using the {@link BlobClient} from
+ * the given {@link Supplier}.
+ *
+ * @param jobGraph jobgraph requiring files
+ * @param userJars jars to upload
+ * @param userArtifacts artifacts to upload
+ * @param clientSupplier supplier of blob client to upload files with
+ * @throws FlinkException if the upload fails
+ */
+ public static void uploadJobGraphFiles(
+ JobGraph jobGraph,
+ Collection<Path> userJars,
+ Collection<Tuple2<String, org.apache.flink.core.fs.Path>> userArtifacts,
+ SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException {
if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
try (BlobClient client = clientSupplier.get()) {
- uploadAndSetUserJars(jobGraph, client);
- uploadAndSetUserArtifacts(jobGraph, client);
+ uploadAndSetUserJars(jobGraph, userJars, client);
+ uploadAndSetUserArtifacts(jobGraph, userArtifacts, client);
} catch (IOException ioe) {
throw new FlinkException("Could not upload job files.", ioe);
}
@@ -64,15 +83,15 @@ public enum ClientUtils {
}
/**
- * Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient},
- * and sets the appropriate blobkeys.
+ * Uploads the given user jars using the given {@link BlobClient}, and sets the appropriate blobkeys on the given {@link JobGraph}.
*
- * @param jobGraph jobgraph requiring user jars
+ * @param jobGraph jobgraph requiring user jars
+ * @param userJars jars to upload
* @param blobClient client to upload jars with
* @throws IOException if the upload fails
*/
- private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException {
- Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+ private static void uploadAndSetUserJars(JobGraph jobGraph, Collection<Path> userJars, BlobClient blobClient) throws IOException {
+ Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), userJars, blobClient);
setUserJarBlobKeys(blobKeys, jobGraph);
}
@@ -90,18 +109,14 @@ public enum ClientUtils {
}
/**
- * Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient},
- * and sets the appropriate blobkeys.
+ * Uploads the given user artifacts using the given {@link BlobClient}, and sets the appropriate blobkeys on the given {@link JobGraph}.
*
* @param jobGraph jobgraph requiring user artifacts
+ * @param artifactPaths artifacts to upload
* @param blobClient client to upload artifacts with
* @throws IOException if the upload fails
*/
- private static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException {
- Collection<Tuple2<String, Path>> artifactPaths = jobGraph.getUserArtifacts().entrySet().stream()
- .map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath)))
- .collect(Collectors.toList());
-
+ private static void uploadAndSetUserArtifacts(JobGraph jobGraph, Collection<Tuple2<String, Path>> artifactPaths, BlobClient blobClient) throws IOException {
Collection<Tuple2<String, PermanentBlobKey>> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient);
setUserArtifactBlobKeys(jobGraph, blobKeys);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 27da3b8..635def2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -424,7 +424,7 @@ public class JobClient {
}
try {
- ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config));
+ ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config));
} catch (FlinkException e) {
throw new JobSubmissionException(jobGraph.getJobID(),
"Could not upload job files.", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 2783b09..89978fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -155,7 +155,7 @@ public class JobSubmissionClientActor extends JobClientActor {
final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync(
(InetSocketAddress blobServerAddress) -> {
try {
- ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig));
+ ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig));
} catch (FlinkException e) {
throw new CompletionException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8072cf4..4279330 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -88,17 +87,12 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
final Time timeout = restConfiguration.getTimeout();
- BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(
- restAddressFuture,
- leaderRetriever,
- timeout,
- responseHeaders);
-
JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
restAddressFuture,
leaderRetriever,
timeout,
- responseHeaders);
+ responseHeaders,
+ executor);
if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
try {
@@ -125,7 +119,6 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
log.info("Web-based job submission is not enabled.");
}
- handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4fab2b8..97ab5a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -676,7 +676,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) {
return blobServerAddressFuture.thenAccept(blobServerAddress -> {
try {
- ClientUtils.uploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
+ ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
} catch (FlinkException e) {
throw new CompletionException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 1e88425..0d8605a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -82,98 +84,84 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
}
@Override
- protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception {
+ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) {
HttpRequest httpRequest = routedRequest.getRequest();
if (log.isTraceEnabled()) {
log.trace("Received request " + httpRequest.uri() + '.');
}
+ FileUploads uploadedFiles = null;
try {
if (!(httpRequest instanceof FullHttpRequest)) {
// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
// FullHttpRequests.
log.error("Implementation error: Received a request that wasn't a FullHttpRequest.");
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody("Bad request received."),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
+ throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST);
}
final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
- try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) {
+ uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx);
- if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody("File uploads not allowed."),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
- }
+ if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) {
+ throw new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST);
+ }
- R request;
- if (msgContent.capacity() == 0) {
- try {
- request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
- } catch (JsonParseException | JsonMappingException je) {
- log.error("Request did not conform to expected format.", je);
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody("Bad request received."),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
- }
- } else {
- try {
- ByteBufInputStream in = new ByteBufInputStream(msgContent);
- request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
- } catch (JsonParseException | JsonMappingException je) {
- log.error("Failed to read request.", je);
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
- HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
- }
+ R request;
+ if (msgContent.capacity() == 0) {
+ try {
+ request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+ } catch (JsonParseException | JsonMappingException je) {
+ log.error("Request did not conform to expected format.", je);
+ throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je);
}
-
- final HandlerRequest<R, M> handlerRequest;
-
+ } else {
try {
- handlerRequest = new HandlerRequest<R, M>(
- request,
- untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
- routedRequest.getRouteResult().pathParams(),
- routedRequest.getRouteResult().queryParams(),
- uploadedFiles.getUploadedFiles());
- } catch (HandlerRequestException hre) {
- log.error("Could not create the handler request.", hre);
-
- HandlerUtils.sendErrorResponse(
- ctx,
- httpRequest,
- new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())),
+ ByteBufInputStream in = new ByteBufInputStream(msgContent);
+ request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass());
+ } catch (JsonParseException | JsonMappingException je) {
+ log.error("Failed to read request.", je);
+ throw new RestHandlerException(
+ String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
HttpResponseStatus.BAD_REQUEST,
- responseHeaders);
- return;
+ je);
}
+ }
- respondToRequest(
- ctx,
- httpRequest,
- handlerRequest,
- gateway);
+ final HandlerRequest<R, M> handlerRequest;
+
+ try {
+ handlerRequest = new HandlerRequest<R, M>(
+ request,
+ untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
+ routedRequest.getRouteResult().pathParams(),
+ routedRequest.getRouteResult().queryParams(),
+ uploadedFiles.getUploadedFiles());
+ } catch (HandlerRequestException hre) {
+ log.error("Could not create the handler request.", hre);
+ throw new RestHandlerException(
+ String.format("Bad request, could not parse parameters: %s", hre.getMessage()),
+ HttpResponseStatus.BAD_REQUEST,
+ hre);
}
+ CompletableFuture<Void> requestProcessingFuture = respondToRequest(
+ ctx,
+ httpRequest,
+ handlerRequest,
+ gateway);
+
+ final FileUploads finalUploadedFiles = uploadedFiles;
+ requestProcessingFuture
+ .whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles));
+ } catch (RestHandlerException rhe) {
+ HandlerUtils.sendErrorResponse(
+ ctx,
+ httpRequest,
+ new ErrorResponseBody(rhe.getMessage()),
+ rhe.getHttpResponseStatus(),
+ responseHeaders);
+ cleanupFileUploads(uploadedFiles);
} catch (Throwable e) {
log.error("Request processing failed.", e);
HandlerUtils.sendErrorResponse(
@@ -182,6 +170,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
new ErrorResponseBody("Internal server error."),
HttpResponseStatus.INTERNAL_SERVER_ERROR,
responseHeaders);
+ cleanupFileUploads(uploadedFiles);
+ }
+ }
+
+ private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
+ if (uploadedFiles != null) {
+ try {
+ uploadedFiles.close();
+ } catch (IOException e) {
+ log.warn("Could not cleanup uploaded files.", e);
+ }
}
}
@@ -192,9 +191,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques
* @param httpRequest original http request
* @param handlerRequest typed handler request
* @param gateway leader gateway
+ * @return Future which is completed once the request has been processed
* @throws RestHandlerException if an exception occurred while responding
*/
- protected abstract void respondToRequest(
+ protected abstract CompletableFuture<Void> respondToRequest(
ChannelHandlerContext ctx,
HttpRequest httpRequest,
HandlerRequest<R, M> handlerRequest,
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 448711b..e4cec08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
}
@Override
- protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
+ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
CompletableFuture<P> response;
try {
@@ -79,7 +79,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
response = FutureUtils.completedExceptionally(e);
}
- response.whenComplete((P resp, Throwable throwable) -> {
+ return response.whenComplete((P resp, Throwable throwable) -> {
if (throwable != null) {
Throwable error = ExceptionUtils.stripCompletionException(throwable);
@@ -105,7 +105,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re
messageHeaders.getResponseStatusCode(),
responseHeaders);
}
- });
+ }).thenApply(ignored -> null);
}
private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest, RestHandlerException rhe) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
index 31ac47bb..b233cb5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@@ -58,7 +59,7 @@ public final class FileUploads implements AutoCloseable {
this.uploadDirectory = uploadDirectory;
}
- public Collection<Path> getUploadedFiles() throws IOException {
+ public Collection<File> getUploadedFiles() throws IOException {
if (uploadDirectory == null) {
return Collections.emptyList();
}
@@ -78,9 +79,9 @@ public final class FileUploads implements AutoCloseable {
private static final class FileAdderVisitor extends SimpleFileVisitor<Path> {
- private final Collection<Path> files = new ArrayList<>(4);
+ private final Collection<File> files = new ArrayList<>(4);
- Collection<Path> getContainedFiles() {
+ Collection<File> getContainedFiles() {
return files;
}
@@ -90,7 +91,7 @@ public final class FileUploads implements AutoCloseable {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult result = super.visitFile(file, attrs);
- files.add(file);
+ files.add(file.toFile());
return result;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 7e93556..990dae5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnull;
-import java.nio.file.Path;
+import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -43,7 +43,7 @@ import java.util.StringJoiner;
public class HandlerRequest<R extends RequestBody, M extends MessageParameters> {
private final R requestBody;
- private final Collection<Path> uploadedFiles;
+ private final Collection<File> uploadedFiles;
private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
@@ -55,7 +55,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
this(requestBody, messageParameters, receivedPathParameters, receivedQueryParameters, Collections.emptyList());
}
- public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<Path> uploadedFiles) throws HandlerRequestException {
+ public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<File> uploadedFiles) throws HandlerRequestException {
this.requestBody = Preconditions.checkNotNull(requestBody);
this.uploadedFiles = Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
Preconditions.checkNotNull(messageParameters);
@@ -141,7 +141,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
}
@Nonnull
- public Collection<Path> getUploadedFiles() {
+ public Collection<File> getUploadedFiles() {
return uploadedFiles;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
deleted file mode 100644
index 4b5fa89..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-
-/**
- * This handler can be used to retrieve the port that the blob server runs on.
- */
-public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
- public BlobServerPortHandler(
- CompletableFuture<String> localRestAddress,
- GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
- Time timeout,
- Map<String, String> headers) {
- super(localRestAddress, leaderRetriever, timeout, headers, BlobServerPortHeaders.getInstance());
- }
-
- @Override
- protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- return gateway
- .getBlobServerPort(timeout)
- .thenApply(BlobServerPortResponseBody::new)
- .exceptionally(error -> {
- throw new CompletionException(new RestHandlerException(
- "Failed to retrieve blob server port.",
- HttpResponseStatus.INTERNAL_SERVER_ERROR,
- ExceptionUtils.stripCompletionException(error)));
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index af04629..052b056 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -19,8 +19,14 @@
package org.apache.flink.runtime.rest.handler.job;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -29,47 +35,153 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nonnull;
-import java.io.ByteArrayInputStream;
+import java.io.File;
import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
/**
* This handler can be used to submit jobs to a Flink cluster.
*/
public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+ private static final String FILE_TYPE_JOB_GRAPH = "JobGraph";
+ private static final String FILE_TYPE_JAR = "Jar";
+ private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+ private final Executor executor;
+
public JobSubmitHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Time timeout,
- Map<String, String> headers) {
+ Map<String, String> headers,
+ Executor executor) {
super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance());
+ this.executor = executor;
}
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- JobGraph jobGraph;
- try {
- ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
- jobGraph = (JobGraph) objectIn.readObject();
- } catch (Exception e) {
+ final Collection<File> uploadedFiles = request.getUploadedFiles();
+ final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
+ File::getName,
+ Path::fromLocalFile
+ ));
+
+ if (uploadedFiles.size() != nameToFile.size()) {
throw new RestHandlerException(
- "Failed to deserialize JobGraph.",
- HttpResponseStatus.BAD_REQUEST,
- e);
+ String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
+ uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
+ nameToFile.size(),
+ uploadedFiles.size()),
+ HttpResponseStatus.BAD_REQUEST
+ );
}
- return gateway.submitJob(jobGraph, timeout)
- .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
+ final JobSubmitRequestBody requestBody = request.getRequestBody();
+
+ CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
+
+ Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
+
+ Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
+
+ CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts);
+
+ CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
+
+ return jobSubmissionFuture.thenCombine(jobGraphFuture,
+ (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()))
.exceptionally(exception -> {
throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
});
}
+
+ private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody requestBody, Map<String, Path> nameToFile) throws MissingFileException {
+ final Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, nameToFile);
+
+ return CompletableFuture.supplyAsync(() -> {
+ JobGraph jobGraph;
+ try (ObjectInputStream objectIn = new ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))) {
+ jobGraph = (JobGraph) objectIn.readObject();
+ } catch (Exception e) {
+ throw new CompletionException(new RestHandlerException(
+ "Failed to deserialize JobGraph.",
+ HttpResponseStatus.BAD_REQUEST,
+ e));
+ }
+ return jobGraph;
+ }, executor);
+ }
+
+ private static Collection<Path> getJarFilesToUpload(Collection<String> jarFileNames, Map<String, Path> nameToFileMap) throws MissingFileException {
+ Collection<Path> jarFiles = new ArrayList<>(jarFileNames.size());
+ for (String jarFileName : jarFileNames) {
+ Path jarFile = getPathAndAssertUpload(jarFileName, FILE_TYPE_JAR, nameToFileMap);
+ jarFiles.add(new Path(jarFile.toString()));
+ }
+ return jarFiles;
+ }
+
+ private static Collection<Tuple2<String, Path>> getArtifactFilesToUpload(
+ Collection<JobSubmitRequestBody.DistributedCacheFile> artifactEntries,
+ Map<String, Path> nameToFileMap) throws MissingFileException {
+ Collection<Tuple2<String, Path>> artifacts = new ArrayList<>(artifactEntries.size());
+ for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : artifactEntries) {
+ Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, FILE_TYPE_ARTIFACT, nameToFileMap);
+ artifacts.add(Tuple2.of(artifactFileName.entryName, new Path(artifactFile.toString())));
+ }
+ return artifacts;
+ }
+
+ private CompletableFuture<JobGraph> uploadJobGraphFiles(
+ DispatcherGateway gateway,
+ CompletableFuture<JobGraph> jobGraphFuture,
+ Collection<Path> jarFiles,
+ Collection<Tuple2<String, Path>> artifacts) {
+ CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
+
+ return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
+ final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
+ try {
+ ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, new Configuration()));
+ } catch (FlinkException e) {
+ throw new CompletionException(new RestHandlerException(
+ "Could not upload job files.",
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ e));
+ }
+ return jobGraph;
+ });
+ }
+
+ private static Path getPathAndAssertUpload(String fileName, String type, Map<String, Path> uploadedFiles) throws MissingFileException {
+ final Path file = uploadedFiles.get(fileName);
+ if (file == null) {
+ throw new MissingFileException(type, fileName);
+ }
+ return file;
+ }
+
+ private static final class MissingFileException extends RestHandlerException {
+
+ private static final long serialVersionUID = -7954810495610194965L;
+
+ MissingFileException(String type, String fileName) {
+ super(type + " file " + fileName + " could not be found on the server.", HttpResponseStatus.BAD_REQUEST);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 83fab69..edefa15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -117,7 +117,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
}
@Override
- protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
final ResourceID taskManagerId = handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
final CompletableFuture<TransientBlobKey> blobKeyFuture;
@@ -152,7 +152,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag
},
ctx.executor());
- resultFuture.whenComplete(
+ return resultFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
log.error("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable);
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
deleted file mode 100644
index a845de3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * These headers define the protocol for querying the port of the blob server.
- */
-public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
-
- private static final String URL = "/blobserver/port";
- private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders();
-
- private BlobServerPortHeaders() {
- }
-
- @Override
- public Class<EmptyRequestBody> getRequestClass() {
- return EmptyRequestBody.class;
- }
-
- @Override
- public HttpMethodWrapper getHttpMethod() {
- return HttpMethodWrapper.GET;
- }
-
- @Override
- public String getTargetRestEndpointURL() {
- return URL;
- }
-
- @Override
- public Class<BlobServerPortResponseBody> getResponseClass() {
- return BlobServerPortResponseBody.class;
- }
-
- @Override
- public HttpResponseStatus getResponseStatusCode() {
- return HttpResponseStatus.OK;
- }
-
- @Override
- public EmptyMessageParameters getUnresolvedMessageParameters() {
- return EmptyMessageParameters.getInstance();
- }
-
- public static BlobServerPortHeaders getInstance() {
- return INSTANCE;
- }
-
- @Override
- public String getDescription() {
- return "Returns the port of blob server which can be used to upload jars.";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
deleted file mode 100644
index 895ecf3..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Response containing the blob server port.
- */
-public final class BlobServerPortResponseBody implements ResponseBody {
-
- static final String FIELD_NAME_PORT = "port";
-
- /**
- * The port of the blob server.
- */
- @JsonProperty(FIELD_NAME_PORT)
- public final int port;
-
- @JsonCreator
- public BlobServerPortResponseBody(
- @JsonProperty(FIELD_NAME_PORT) int port) {
-
- this.port = port;
- }
-
- @Override
- public int hashCode() {
- return 67 * port;
- }
-
- @Override
- public boolean equals(Object object) {
- if (object instanceof BlobServerPortResponseBody) {
- BlobServerPortResponseBody other = (BlobServerPortResponseBody) object;
- return this.port == other.port;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
index 88f53f2..42f64b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.rest.messages.job;
+import org.apache.flink.runtime.rest.FileUploadHandler;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -71,6 +72,14 @@ public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, Jo
@Override
public String getDescription() {
- return "Submits a job. This call is primarily intended to be used by the Flink client.";
+ return "Submits a job. This call is primarily intended to be used by the Flink client. This call expects a" +
+ "multipart/form-data request that consists of file uploads for the serialized JobGraph, jars and" +
+ "distributed cache artifacts and an attribute named \"" + FileUploadHandler.HTTP_ATTRIBUTE_REQUEST + "\"for " +
+ "the JSON payload.";
+ }
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
index 3f550f0..0ca82b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -18,64 +18,119 @@
package org.apache.flink.runtime.rest.messages.job;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
/**
* Request for submitting a job.
*
- * <p>We currently require the job-jars to be uploaded through the blob-server.
+ * <p>This request only contains the names of files that must be present on the server, and defines how these files are
+ * interpreted.
*/
public final class JobSubmitRequestBody implements RequestBody {
- private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph";
+ private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+ private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+ private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames";
- /**
- * The serialized job graph.
- */
- @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
- public final byte[] serializedJobGraph;
+ @JsonProperty(FIELD_NAME_JOB_GRAPH)
+ public final String jobGraphFileName;
- public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
- this(serializeJobGraph(jobGraph));
- }
+ @JsonProperty(FIELD_NAME_JOB_JARS)
+ public final Collection<String> jarFileNames;
+
+ @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+ public final Collection<DistributedCacheFile> artifactFileNames;
@JsonCreator
public JobSubmitRequestBody(
- @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) {
- this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph);
+ @JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName,
+ @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> jarFileNames,
+ @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) Collection<DistributedCacheFile> artifactFileNames) {
+ this.jobGraphFileName = jobGraphFileName;
+ this.jarFileNames = jarFileNames;
+ this.artifactFileNames = artifactFileNames;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+ return Objects.equals(jobGraphFileName, that.jobGraphFileName) &&
+ Objects.equals(jarFileNames, that.jarFileNames) &&
+ Objects.equals(artifactFileNames, that.artifactFileNames);
}
@Override
public int hashCode() {
- return 71 * Arrays.hashCode(this.serializedJobGraph);
+ return Objects.hash(jobGraphFileName, jarFileNames, artifactFileNames);
}
@Override
- public boolean equals(Object object) {
- if (object instanceof JobSubmitRequestBody) {
- JobSubmitRequestBody other = (JobSubmitRequestBody) object;
- return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph);
- }
- return false;
+ public String toString() {
+ return "JobSubmitRequestBody{" +
+ "jobGraphFileName='" + jobGraphFileName + '\'' +
+ ", jarFileNames=" + jarFileNames +
+ ", artifactFileNames=" + artifactFileNames +
+ '}';
}
- private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) {
- ObjectOutputStream out = new ObjectOutputStream(baos);
+ /**
+ * Descriptor for a distributed cache file.
+ */
+ public static class DistributedCacheFile {
+ private static final String FIELD_NAME_ENTRY_NAME = "entryName";
+ private static final String FIELD_NAME_FILE_NAME = "fileName";
+
+ @JsonProperty(FIELD_NAME_ENTRY_NAME)
+ public final String entryName;
+
+ @JsonProperty(FIELD_NAME_FILE_NAME)
+ public final String fileName;
+
+ @JsonCreator
+ public DistributedCacheFile(
+ @JsonProperty(FIELD_NAME_ENTRY_NAME) String entryName,
+ @JsonProperty(FIELD_NAME_FILE_NAME) String fileName) {
+ this.entryName = entryName;
+ this.fileName = fileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DistributedCacheFile that = (DistributedCacheFile) o;
+ return Objects.equals(entryName, that.entryName) &&
+ Objects.equals(fileName, that.fileName);
+ }
- out.writeObject(jobGraph);
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(entryName, fileName);
+ }
- return baos.toByteArray();
+ @Override
+ public String toString() {
+ return "DistributedCacheFile{" +
+ "entryName='" + entryName + '\'' +
+ ", fileName='" + fileName + '\'' +
+ '}';
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
index c5135bf..a538e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
@@ -23,6 +23,10 @@ import org.apache.flink.configuration.ConfigConstants;
/**
* This class contains constants to be used by rest components.
*/
-public class RestConstants {
+public enum RestConstants {
+ ;
+
public static final String REST_CONTENT_TYPE = "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name();
+ public static final String CONTENT_TYPE_JAR = "application/java-archive";
+ public static final String CONTENT_TYPE_BINARY = "application/octet-stream";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
index dc14cb1..f151f28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -85,7 +85,7 @@ public class ClientUtilsTest extends TestLogger {
assertEquals(jars.size(), jobGraph.getUserJars().size());
assertEquals(0, jobGraph.getUserJarBlobKeys().size());
- ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+ ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
assertEquals(jars.size(), jobGraph.getUserJars().size());
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
@@ -124,7 +124,7 @@ public class ClientUtilsTest extends TestLogger {
assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
assertEquals(0, jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
- ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
+ ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration()));
assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
assertEquals(localArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count());
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
index 91fba68..607c1c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
@@ -81,7 +81,8 @@ public class AbstractHandlerTest extends TestLogger {
final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
CompletableFuture.completedFuture(mockRestfulGateway);
- TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
+ CompletableFuture<Void> requestProcessingCompleteFuture = new CompletableFuture<>();
+ TestHandler handler = new TestHandler(requestProcessingCompleteFuture, CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), "");
HttpRequest request = new DefaultFullHttpRequest(
@@ -101,6 +102,9 @@ public class AbstractHandlerTest extends TestLogger {
handler.respondAsLeader(context, routerRequest, mockRestfulGateway);
+ // the (asynchronous) request processing is not yet complete so the files should still exist
+ Assert.assertTrue(Files.exists(file));
+ requestProcessingCompleteFuture.complete(null);
Assert.assertFalse(Files.exists(file));
}
@@ -156,14 +160,16 @@ public class AbstractHandlerTest extends TestLogger {
}
private static class TestHandler extends AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
+ private final CompletableFuture<Void> completionFuture;
- protected TestHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
+ protected TestHandler(CompletableFuture<Void> completionFuture, @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
super(localAddressFuture, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE);
+ this.completionFuture = completionFuture;
}
@Override
- protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
-
+ protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException {
+ return completionFuture;
}
private enum TestHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
@@ -188,6 +194,11 @@ public class AbstractHandlerTest extends TestLogger {
public String getTargetRestEndpointURL() {
return "/test";
}
+
+ @Override
+ public boolean acceptsFileUploads() {
+ return true;
+ }
}
}
}
[3/5] flink git commit: [FLINK-9280][rest] Rework JobSubmitHandler to
accept jar/artifact files
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index 1311b80..0153d5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -66,6 +66,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertArrayEquals;
@@ -203,7 +204,7 @@ public class MultipartUploadResource extends ExternalResource {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles());
+ MultipartFileHandler.verifyFileUpload(expectedFiles, request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList()));
this.lastReceivedRequest = request.getRequestBody();
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
@@ -268,7 +269,7 @@ public class MultipartUploadResource extends ExternalResource {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<TestRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Collection<Path> uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
if (!uploadedFiles.isEmpty()) {
throw new RestHandlerException("This handler should not have received file uploads.", HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
@@ -313,7 +314,7 @@ public class MultipartUploadResource extends ExternalResource {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
- verifyFileUpload(expectedFiles, request.getUploadedFiles());
+ verifyFileUpload(expectedFiles, request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList()));
return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index b9413ba..93dbb5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -85,6 +85,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static org.hamcrest.Matchers.containsString;
@@ -661,7 +662,7 @@ public class RestServerEndpointITCase extends TestLogger {
@Override
protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException {
- Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Collection<Path> uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
if (uploadedFiles.size() != 1) {
throw new RestHandlerException("Expected 1 file, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
index fb7faa3..2b14641 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
@@ -25,11 +25,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.stream.Collectors;
/**
* Tests for {@link FileUploads}.
@@ -64,7 +66,7 @@ public class FileUploadsTest extends TestLogger {
Files.createFile(tmp.resolve(subFile));
try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
- Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+ Collection<Path> detectedFiles = fileUploads.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
Assert.assertEquals(2, detectedFiles.size());
Assert.assertTrue(detectedFiles.contains(tmp.resolve(rootFile)));
@@ -80,7 +82,7 @@ public class FileUploadsTest extends TestLogger {
Files.createDirectory(tmp.resolve(rootDir));
try (FileUploads fileUploads = new FileUploads(tmp.resolve(rootDir))) {
- Collection<Path> detectedFiles = fileUploads.getUploadedFiles();
+ Collection<File> detectedFiles = fileUploads.getUploadedFiles();
Assert.assertEquals(0, detectedFiles.size());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
deleted file mode 100644
index 15c2eb4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the {@link BlobServerPortHandler}.
- */
-public class BlobServerPortHandlerTest extends TestLogger {
- private static final int PORT = 64;
-
- @Test
- public void testPortRetrieval() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.getBlobServerPort(any(Time.class)))
- .thenReturn(CompletableFuture.completedFuture(PORT));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-
- BlobServerPortHandler handler = new BlobServerPortHandler(
- CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
-
- BlobServerPortResponseBody portResponse = handler
- .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
- .get();
-
- Assert.assertEquals(PORT, portResponse.port);
- }
-
- @Test
- public void testPortRetrievalFailureHandling() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.getBlobServerPort(any(Time.class)))
- .thenReturn(FutureUtils.completedExceptionally(new TestException()));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
-
- BlobServerPortHandler handler = new BlobServerPortHandler(
- CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
- RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
-
- try {
- handler
- .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
- .get();
- Assert.fail();
- } catch (ExecutionException ee) {
- RestHandlerException rhe = (RestHandlerException) ee.getCause();
-
- Assert.assertEquals(TestException.class, rhe.getCause().getClass());
- Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, rhe.getHttpResponseStatus());
- }
- }
-
- private static class TestException extends Exception {
- private static final long serialVersionUID = -7064446788277853899L;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 9c97ad4..0003829 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -18,7 +18,11 @@
package org.apache.flink.runtime.rest.handler.job;
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -28,40 +32,69 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
/**
* Tests for the {@link JobSubmitHandler}.
*/
public class JobSubmitHandlerTest extends TestLogger {
+ @ClassRule
+ public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+ private static BlobServer blobServer;
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ Configuration config = new Configuration();
+ config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+ TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(config, new VoidBlobStore());
+ blobServer.start();
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ if (blobServer != null) {
+ blobServer.close();
+ }
+ }
+
@Test
public void testSerializationFailureHandling() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ DispatcherGateway mockGateway = new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .build();
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
+ () -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]);
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList(), Collections.emptyList());
try {
handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway);
@@ -73,40 +106,143 @@ public class JobSubmitHandlerTest extends TestLogger {
@Test
public void testSuccessfulJobSubmission() throws Exception {
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+
+ TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
+ builder
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .setHostname("localhost");
+ DispatcherGateway mockGateway = builder.build();
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
- mockGatewayRetriever,
+ () -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobGraph job = new JobGraph("testjob");
- JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
- handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+ handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Collections.singleton(jobGraphFile.toFile())), mockGateway)
.get();
}
@Test
+ public void testRejectionOnCountMismatch() throws Exception {
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(new JobGraph("testjob"));
+ }
+ final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
+ builder
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
+ .setHostname("localhost");
+ DispatcherGateway mockGateway = builder.build();
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+ CompletableFuture.completedFuture("http://localhost:1234"),
+ () -> CompletableFuture.completedFuture(mockGateway),
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
+
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
+
+ try {
+ handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), Arrays.asList(jobGraphFile.toFile(), countExceedingFile.toFile())), mockGateway)
+ .get();
+ } catch (Exception e) {
+ ExceptionUtils.findThrowable(e, candidate -> candidate instanceof RestHandlerException && candidate.getMessage().contains("count"));
+ }
+ }
+
+ @Test
+ public void testFileHandling() throws Exception {
+ final String dcEntryName = "entry";
+
+ CompletableFuture<JobGraph> submittedJobGraphFuture = new CompletableFuture<>();
+ DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder()
+ .setBlobServerPort(blobServer.getPort())
+ .setSubmitFunction(submittedJobGraph -> {
+ submittedJobGraphFuture.complete(submittedJobGraph);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+
+ JobSubmitHandler handler = new JobSubmitHandler(
+ CompletableFuture.completedFuture("http://localhost:1234"),
+ () -> CompletableFuture.completedFuture(dispatcherGateway),
+ RpcUtils.INF_TIMEOUT,
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
+
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+ final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
+ final Path artifactFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ final JobGraph jobGraph = new JobGraph();
+ // the entry that should be updated
+ jobGraph.addUserArtifact(dcEntryName, new DistributedCache.DistributedCacheEntry("random", false));
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
+ }
+
+ JobSubmitRequestBody request = new JobSubmitRequestBody(
+ jobGraphFile.getFileName().toString(),
+ Collections.singletonList(jarFile.getFileName().toString()),
+ Collections.singleton(new JobSubmitRequestBody.DistributedCacheFile(dcEntryName, artifactFile.getFileName().toString())));
+
+ handler.handleRequest(new HandlerRequest<>(
+ request,
+ EmptyMessageParameters.getInstance(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Arrays.asList(jobGraphFile.toFile(), jarFile.toFile(), artifactFile.toFile())), dispatcherGateway)
+ .get();
+
+ Assert.assertTrue("No JobGraph was submitted.", submittedJobGraphFuture.isDone());
+ final JobGraph submittedJobGraph = submittedJobGraphFuture.get();
+ Assert.assertEquals(1, submittedJobGraph.getUserJarBlobKeys().size());
+ Assert.assertEquals(1, submittedJobGraph.getUserArtifacts().size());
+ Assert.assertNotNull(submittedJobGraph.getUserArtifacts().get(dcEntryName).blobKey);
+ }
+
+ @Test
public void testFailedJobSubmission() throws Exception {
final String errorMessage = "test";
- DispatcherGateway mockGateway = mock(DispatcherGateway.class);
- when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(FutureUtils.completedExceptionally(new Exception(errorMessage)));
+ DispatcherGateway mockGateway = new TestingDispatcherGateway.Builder()
+ .setSubmitFunction(jobgraph -> FutureUtils.completedExceptionally(new Exception(errorMessage)))
+ .build();
JobSubmitHandler handler = new JobSubmitHandler(
CompletableFuture.completedFuture("http://localhost:1234"),
() -> CompletableFuture.completedFuture(mockGateway),
RpcUtils.INF_TIMEOUT,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ TestingUtils.defaultExecutor());
- JobGraph job = new JobGraph("testjob");
- JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+ final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
+
+ JobGraph jobGraph = new JobGraph("testjob");
+ try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
+ }
+ JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
try {
- handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+ handler.handleRequest(new HandlerRequest<>(
+ request,
+ EmptyMessageParameters.getInstance(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.singletonList(jobGraphFile.toFile())), mockGateway)
.get();
} catch (Exception e) {
Throwable t = ExceptionUtils.stripExecutionException(e);
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
deleted file mode 100644
index 7ad72fc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-/**
- * Tests for {@link BlobServerPortResponseBody}.
- */
-public class BlobServerPortResponseTest extends RestResponseMarshallingTestBase<BlobServerPortResponseBody> {
-
- @Override
- protected Class<BlobServerPortResponseBody> getTestResponseClass() {
- return BlobServerPortResponseBody.class;
- }
-
- @Override
- protected BlobServerPortResponseBody getTestResponseInstance() throws Exception {
- return new BlobServerPortResponseBody(64);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
index 7627d98..947994b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rest.messages;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import java.io.IOException;
+import java.util.Arrays;
/**
* Tests for the {@link JobSubmitRequestBody}.
@@ -35,6 +35,11 @@ public class JobSubmitRequestBodyTest extends RestRequestMarshallingTestBase<Job
@Override
protected JobSubmitRequestBody getTestRequestInstance() throws IOException {
- return new JobSubmitRequestBody(new JobGraph("job"));
+ return new JobSubmitRequestBody(
+ "jobgraph",
+ Arrays.asList("jar1", "jar2"),
+ Arrays.asList(
+ new JobSubmitRequestBody.DistributedCacheFile("entry1", "artifact1"),
+ new JobSubmitRequestBody.DistributedCacheFile("entry2", "artifact2")));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
new file mode 100644
index 0000000..5d19a01
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingDispatcherGateway.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of the {@link DispatcherGateway}.
+ */
+public final class TestingDispatcherGateway extends TestingRestfulGateway implements DispatcherGateway {
+
+ static final Function<JobGraph, CompletableFuture<Acknowledge>> DEFAULT_SUBMIT_FUNCTION = jobGraph -> CompletableFuture.completedFuture(Acknowledge.get());
+ static final Supplier<CompletableFuture<Collection<JobID>>> DEFAULT_LIST_FUNCTION = () -> CompletableFuture.completedFuture(Collections.emptyList());
+ static final int DEFAULT_BLOB_SERVER_PORT = 1234;
+ static final DispatcherId DEFAULT_FENCING_TOKEN = DispatcherId.generate();
+ static final Function<JobID, CompletableFuture<ArchivedExecutionGraph>> DEFAULT_REQUEST_ARCHIVED_JOB_FUNCTION = jobID -> CompletableFuture.completedFuture(null);
+
+ private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
+ private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
+ private int blobServerPort;
+ private DispatcherId fencingToken;
+ private Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction;
+
+ public TestingDispatcherGateway() {
+ super();
+ submitFunction = DEFAULT_SUBMIT_FUNCTION;
+ listFunction = DEFAULT_LIST_FUNCTION;
+ blobServerPort = DEFAULT_BLOB_SERVER_PORT;
+ fencingToken = DEFAULT_FENCING_TOKEN;
+ requestArchivedJobFunction = DEFAULT_REQUEST_ARCHIVED_JOB_FUNCTION;
+ }
+
+ public TestingDispatcherGateway(
+ String address,
+ String hostname,
+ String restAddress,
+ Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction,
+ Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction,
+ Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction,
+ Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction,
+ Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction,
+ Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier,
+ Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier,
+ Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier,
+ Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier,
+ BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction,
+ BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction,
+ Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction,
+ Supplier<CompletableFuture<Collection<JobID>>> listFunction,
+ int blobServerPort,
+ DispatcherId fencingToken,
+ Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction) {
+ super(
+ address,
+ hostname,
+ restAddress,
+ cancelJobFunction,
+ stopJobFunction,
+ requestJobFunction,
+ requestJobResultFunction,
+ requestJobStatusFunction,
+ requestMultipleJobDetailsSupplier,
+ requestClusterOverviewSupplier,
+ requestMetricQueryServicePathsSupplier,
+ requestTaskManagerMetricQueryServicePathsSupplier,
+ requestOperatorBackPressureStatsFunction,
+ triggerSavepointFunction);
+ this.submitFunction = submitFunction;
+ this.listFunction = listFunction;
+ this.blobServerPort = blobServerPort;
+ this.fencingToken = fencingToken;
+ this.requestArchivedJobFunction = requestArchivedJobFunction;
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
+ return submitFunction.apply(jobGraph);
+ }
+
+ @Override
+ public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
+ return listFunction.get();
+ }
+
+ @Override
+ public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
+ return CompletableFuture.completedFuture(blobServerPort);
+ }
+
+ @Override
+ public DispatcherId getFencingToken() {
+ return DEFAULT_FENCING_TOKEN;
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout) {
+ return requestArchivedJobFunction.apply(jobId);
+ }
+
+ /**
+ * Builder for the {@link TestingDispatcherGateway}.
+ */
+ public static final class Builder extends TestingRestfulGateway.Builder {
+
+ private Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction;
+ private Supplier<CompletableFuture<Collection<JobID>>> listFunction;
+ private int blobServerPort;
+ private DispatcherId fencingToken;
+ private Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestArchivedJobFunction;
+
+ public Builder setSubmitFunction(Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction) {
+ this.submitFunction = submitFunction;
+ return this;
+ }
+
+ public Builder setListFunction(Supplier<CompletableFuture<Collection<JobID>>> listFunction) {
+ this.listFunction = listFunction;
+ return this;
+ }
+
+ public Builder setRequestArchivedJobFunction(Function<JobID, CompletableFuture<ArchivedExecutionGraph>> requestJobFunction) {
+ requestArchivedJobFunction = requestJobFunction;
+ return this;
+ }
+
+ @Override
+ public TestingRestfulGateway.Builder setRequestJobFunction(Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction) {
+ // signature clash
+ throw new UnsupportedOperationException("Use setRequestArchivedJobFunction() instead.");
+ }
+
+ public Builder setBlobServerPort(int blobServerPort) {
+ this.blobServerPort = blobServerPort;
+ return this;
+ }
+
+ public Builder setFencingToken(DispatcherId fencingToken) {
+ this.fencingToken = fencingToken;
+ return this;
+ }
+
+ public TestingDispatcherGateway build() {
+ return new TestingDispatcherGateway(
+ address,
+ hostname,
+ restAddress,
+ cancelJobFunction,
+ stopJobFunction,
+ requestJobFunction,
+ requestJobResultFunction,
+ requestJobStatusFunction,
+ requestMultipleJobDetailsSupplier,
+ requestClusterOverviewSupplier,
+ requestMetricQueryServicePathsSupplier,
+ requestTaskManagerMetricQueryServicePathsSupplier,
+ requestOperatorBackPressureStatsFunction,
+ triggerSavepointFunction,
+ submitFunction,
+ listFunction,
+ blobServerPort,
+ fencingToken,
+ requestArchivedJobFunction);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
index b92ba51..09af236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
@@ -211,21 +211,21 @@ public class TestingRestfulGateway implements RestfulGateway {
/**
* Builder for the {@link TestingRestfulGateway}.
*/
- public static final class Builder {
- private String address = LOCALHOST;
- private String hostname = LOCALHOST;
- private String restAddress = LOCALHOST;
- private Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
- private Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction;
- private Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction;
- private Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
- private Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
- private Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
- private Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
- private Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
- private Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
- private BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
- private BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
+ public static class Builder {
+ protected String address = LOCALHOST;
+ protected String hostname = LOCALHOST;
+ protected String restAddress = LOCALHOST;
+ protected Function<JobID, CompletableFuture<Acknowledge>> cancelJobFunction;
+ protected Function<JobID, CompletableFuture<Acknowledge>> stopJobFunction;
+ protected Function<JobID, CompletableFuture<? extends AccessExecutionGraph>> requestJobFunction;
+ protected Function<JobID, CompletableFuture<JobResult>> requestJobResultFunction;
+ protected Function<JobID, CompletableFuture<JobStatus>> requestJobStatusFunction;
+ protected Supplier<CompletableFuture<MultipleJobsDetails>> requestMultipleJobDetailsSupplier;
+ protected Supplier<CompletableFuture<ClusterOverview>> requestClusterOverviewSupplier;
+ protected Supplier<CompletableFuture<Collection<String>>> requestMetricQueryServicePathsSupplier;
+ protected Supplier<CompletableFuture<Collection<Tuple2<ResourceID, String>>>> requestTaskManagerMetricQueryServicePathsSupplier;
+ protected BiFunction<JobID, JobVertexID, CompletableFuture<OperatorBackPressureStatsResponse>> requestOperatorBackPressureStatsFunction;
+ protected BiFunction<JobID, String, CompletableFuture<String>> triggerSavepointFunction;
public Builder() {
cancelJobFunction = DEFAULT_CANCEL_JOB_FUNCTION;
[2/5] flink git commit: [FLINK-9301][tests] Enable
high-parallellism-iterations test
Posted by ch...@apache.org.
[FLINK-9301][tests] Enable high-parallellism-iterations test
This closes #6024.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/885640f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/885640f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/885640f7
Branch: refs/heads/master
Commit: 885640f781aa66359d929eb387f27a6024d75025
Parents: 81d1355
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Wed Jun 27 11:16:51 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 3 21:08:45 2018 +0200
----------------------------------------------------------------------
flink-end-to-end-tests/run-nightly-tests.sh | 2 ++
flink-end-to-end-tests/test-scripts/common.sh | 29 ++++++++++++++++++++
.../test_high_parallelism_iterations.sh | 24 ++++------------
3 files changed, 37 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/885640f7/flink-end-to-end-tests/run-nightly-tests.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index a493f49..15c73d5 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -43,6 +43,8 @@ echo "Flink distribution directory: $FLINK_DIR"
# run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>"
+run_test "ConnectedComponents iterations with high parallelism end-to-end test" "$END_TO_END_DIR/test-scripts/test_high_parallelism_iterations.sh 25"
+
run_test "Queryable state (rocksdb) end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state.sh rocksdb"
run_test "Queryable state (rocksdb) with TM restart end-to-end test" "$END_TO_END_DIR/test-scripts/test_queryable_state_restart_tm.sh"
http://git-wip-us.apache.org/repos/asf/flink/blob/885640f7/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 4ed83b0..e5ad458 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -24,6 +24,14 @@ if [[ -z $FLINK_DIR ]]; then
exit 1
fi
+case "$(uname -s)" in
+ Linux*) OS_TYPE=linux;;
+ Darwin*) OS_TYPE=mac;;
+ CYGWIN*) OS_TYPE=cygwin;;
+ MINGW*) OS_TYPE=mingw;;
+ *) OS_TYPE="UNKNOWN:${unameOut}"
+esac
+
export EXIT_CODE=0
echo "Flink dist directory: $FLINK_DIR"
@@ -39,6 +47,27 @@ cd $TEST_ROOT
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+function print_mem_use_osx {
+ declare -a mem_types=("active" "inactive" "wired down")
+ used=""
+ for mem_type in "${mem_types[@]}"
+ do
+ used_type=$(vm_stat | grep "Pages ${mem_type}:" | awk '{print $NF}' | rev | cut -c 2- | rev)
+ let used_type="(${used_type}*4096)/1024/1024"
+ used="$used $mem_type=${used_type}MB"
+ done
+ let mem=$(sysctl -n hw.memsize)/1024/1024
+ echo "Memory Usage: ${used} total=${mem}MB"
+}
+
+function print_mem_use {
+ if [[ "$OS_TYPE" == "mac" ]]; then
+ print_mem_use_osx
+ else
+ free -m | awk 'NR==2{printf "Memory Usage: used=%sMB total=%sMB %.2f%%\n", $3,$2,$3*100/$2 }'
+ fi
+}
+
function backup_config() {
# back up the masters and flink-conf.yaml
cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
http://git-wip-us.apache.org/repos/asf/flink/blob/885640f7/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
index dbdacef..aa937ae 100755
--- a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
+++ b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
@@ -19,15 +19,14 @@
source "$(dirname "$0")"/common.sh
-PARALLELISM="${PARALLELISM:-100}"
+PARALLELISM="${1:-25}"
TEST=flink-high-parallelism-iterations-test
TEST_PROGRAM_NAME=HighParallelismIterationsTestProgram
TEST_PROGRAM_JAR=${END_TO_END_DIR}/$TEST/target/$TEST_PROGRAM_NAME.jar
-echo "Run Not So MiniCluster Iterations Graph Connected Components Program"
+backup_config
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
echo "taskmanager.heap.mb: 52" >> $FLINK_DIR/conf/flink-conf.yaml # 52Mb x 100 TMs = 5Gb total heap
@@ -41,27 +40,16 @@ echo "taskmanager.network.netty.client.numThreads: 1" >> $FLINK_DIR/conf/flink-c
echo "taskmanager.numberOfTaskSlots: 1" >> $FLINK_DIR/conf/flink-conf.yaml
+print_mem_use
start_cluster
+print_mem_use
let TMNUM=$PARALLELISM-1
echo "Start $TMNUM more task managers"
for i in `seq 1 $TMNUM`; do
$FLINK_DIR/bin/taskmanager.sh start
+ print_mem_use
done
-function test_cleanup {
- # don't call ourselves again for another signal interruption
- trap "exit -1" INT
- # don't call ourselves again for normal exit
- trap "" EXIT
-
- stop_cluster
- $FLINK_DIR/bin/taskmanager.sh stop-all
-
- # revert our modifications to the Flink distribution
- mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
-
$FLINK_DIR/bin/flink run -p $PARALLELISM $TEST_PROGRAM_JAR
+print_mem_use