You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/10/09 17:11:52 UTC

[1/3] flink git commit: [hotfix] [REST] Add utility HandlerRequest constructor

Repository: flink
Updated Branches:
  refs/heads/master 2c734508d -> ad380463d


[hotfix] [REST] Add utility HandlerRequest constructor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62995330
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62995330
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62995330

Branch: refs/heads/master
Commit: 629953306ba0fe879bfd035911ca26b5275f3a25
Parents: 2c73450
Author: zentol <ch...@apache.org>
Authored: Wed Sep 20 14:58:42 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Oct 9 19:11:43 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/rest/handler/HandlerRequest.java    | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62995330/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index c0de3db..aacf0a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -42,6 +42,10 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters>
 	private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2);
 	private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
+	public HandlerRequest(R requestBody, M messageParameters) throws HandlerRequestException {
+		this(requestBody, messageParameters, Collections.emptyMap(), Collections.emptyMap());
+	}
+
 	public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters) throws HandlerRequestException {
 		this.requestBody = Preconditions.checkNotNull(requestBody);
 		Preconditions.checkNotNull(messageParameters);


[2/3] flink git commit: [FLINK-7072] [REST] Define protocol for job submit/cancel/stop

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
new file mode 100644
index 0000000..257b241
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * <p>We currently require the job-jars to be uploaded through the blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+	private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph";
+
+	/**
+	 * The serialized job graph.
+	 */
+	@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+	public final byte[] serializedJobGraph;
+
+	public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
+		this(serializeJobGraph(jobGraph));
+	}
+
+	@JsonCreator
+	public JobSubmitRequestBody(
+		@JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) {
+
+		// check that job graph can be read completely by the HttpObjectAggregator on the server
+		// we subtract 1024 bytes to account for http headers and such.
+		if (serializedJobGraph.length > RestServerEndpoint.MAX_REQUEST_SIZE_BYTES - 1024) {
+			throw new IllegalArgumentException("Serialized job graph exceeded max request size.");
+		}
+		this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph);
+	}
+
+	@Override
+	public int hashCode() {
+		return 71 * Arrays.hashCode(this.serializedJobGraph);
+	}
+
+	@Override
+	public boolean equals(Object object) {
+		if (object instanceof JobSubmitRequestBody) {
+			JobSubmitRequestBody other = (JobSubmitRequestBody) object;
+			return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph);
+		}
+		return false;
+	}
+
+	private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
+		try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) {
+			ObjectOutputStream out = new ObjectOutputStream(baos);
+
+			out.writeObject(jobGraph);
+
+			return baos.toByteArray();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
new file mode 100644
index 0000000..fefd435
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitResponseBody.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Response to the submission of a job, containing a URL from which the status of the job can be retrieved from.
+ */
+public final class JobSubmitResponseBody implements ResponseBody {
+
+	public static final String FIELD_NAME_JOB_URL = "jobUrl";
+
+	/**
+	 * The URL under which the job status can monitored.
+	 */
+	@JsonProperty(FIELD_NAME_JOB_URL)
+	public final String jobUrl;
+
+	@JsonCreator
+	public JobSubmitResponseBody(
+		@JsonProperty(FIELD_NAME_JOB_URL) String jobUrl) {
+
+		this.jobUrl = jobUrl;
+	}
+
+	@Override
+	public int hashCode() {
+		return 73 * jobUrl.hashCode();
+	}
+
+	@Override
+	public boolean equals(Object object) {
+		if (object instanceof JobSubmitResponseBody) {
+			JobSubmitResponseBody other = (JobSubmitResponseBody) object;
+			return Objects.equals(this.jobUrl, other.jobUrl);
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
new file mode 100644
index 0000000..0ea18db
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest extends TestLogger {
+	private static final int PORT = 64;
+
+	@Test
+	public void testPortRetrieval() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.getBlobServerPort(any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(PORT));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		BlobServerPortHandler handler = new BlobServerPortHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		BlobServerPortResponseBody portResponse = handler
+			.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
+			.get();
+
+		Assert.assertEquals(PORT, portResponse.port);
+	}
+
+	@Test
+	public void testPortRetrievalFailureHandling() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.getBlobServerPort(any(Time.class)))
+			.thenReturn(FutureUtils.completedExceptionally(new TestException()));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		BlobServerPortHandler handler = new BlobServerPortHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		try {
+			handler
+				.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway)
+				.get();
+			Assert.fail();
+		} catch (ExecutionException ee) {
+			RestHandlerException rhe = (RestHandlerException) ee.getCause();
+
+			Assert.assertEquals(TestException.class, rhe.getCause().getClass());
+			Assert.assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, rhe.getHttpResponseStatus());
+		}
+	}
+
+	private static class TestException extends Exception {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
new file mode 100644
index 0000000..1196d40
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest extends TestLogger {
+
+	@Test
+	public void testSerializationFailureHandling() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		JobSubmitHandler handler = new JobSubmitHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]);
+
+		try {
+			handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway);
+			Assert.fail();
+		} catch (RestHandlerException rhe) {
+			Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rhe.getHttpResponseStatus());
+		}
+	}
+
+	@Test
+	public void testSuccessfulJobSubmission() throws Exception {
+		DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+		when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+		GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+		JobSubmitHandler handler = new JobSubmitHandler(
+			CompletableFuture.completedFuture("http://localhost:1234"),
+			mockGatewayRetriever,
+			RpcUtils.INF_TIMEOUT);
+
+		JobGraph job = new JobGraph("testjob");
+		JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+		handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway)
+			.get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
new file mode 100644
index 0000000..add4e3b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests for {@link BlobServerPortResponseBody}.
+ */
+public class BlobServerPortResponseTest extends RestResponseMarshallingTestBase<BlobServerPortResponseBody> {
+
+	@Override
+	protected Class<BlobServerPortResponseBody> getTestResponseClass() {
+		return BlobServerPortResponseBody.class;
+	}
+
+	@Override
+	protected BlobServerPortResponseBody getTestResponseInstance() throws Exception {
+		return new BlobServerPortResponseBody(64);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
new file mode 100644
index 0000000..e69913c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestRequestMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+
+import java.io.IOException;
+
+/**
+ * Tests for the {@link JobSubmitRequestBody}.
+ */
+public class JobSubmitRequestBodyTest extends RestRequestMarshallingTestBase<JobSubmitRequestBody> {
+
+	@Override
+	protected Class<JobSubmitRequestBody> getTestRequestClass() {
+		return JobSubmitRequestBody.class;
+	}
+
+	@Override
+	protected JobSubmitRequestBody getTestRequestInstance() throws IOException {
+		return new JobSubmitRequestBody(new JobGraph("job"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
new file mode 100644
index 0000000..9dc832a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+/**
+ * Tests for {@link JobSubmitResponseBody}.
+ */
+public class JobSubmitResponseBodyTest extends RestResponseMarshallingTestBase<JobSubmitResponseBody> {
+
+	@Override
+	protected Class<JobSubmitResponseBody> getTestResponseClass() {
+		return JobSubmitResponseBody.class;
+	}
+
+	@Override
+	protected JobSubmitResponseBody getTestResponseInstance() throws Exception {
+		return new JobSubmitResponseBody("/url");
+	}
+}


[3/3] flink git commit: [FLINK-7072] [REST] Define protocol for job submit/cancel/stop

Posted by tr...@apache.org.
[FLINK-7072] [REST] Define protocol for job submit/cancel/stop

[FLINK-7072] [REST] Extend Dispatcher

[FLINK-7072] [REST] Add handlers for job submit/cancel/stop

[FLINK-7072] [REST] CLI integration

use ExecutorThradFactory + rebase(blobKey fix)

add "Flink" prefix to RestCC threads

shutdown client for cancel/shutdown

Rework CliFrontEnd Stop/Cancel tests

These tests verified that the CLI was sending the correct messages and
parameters to the JM actor. This is now handled by the ClusterClient, so
the tests were adjusted to verify that the correct methods on the
ClusterClient are being called.

Additional tests were added to the ClusterClientTest class to verify
that the correct messages and parameters are being sent.

This closes #4742.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad380463
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad380463
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad380463

Branch: refs/heads/master
Commit: ad380463d3d44cdd98302bf072bc5deba8696b5b
Parents: 6299533
Author: zentol <ch...@apache.org>
Authored: Wed Sep 20 14:55:46 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Oct 9 19:11:44 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  79 +++----
 .../flink/client/cli/Flip6DefaultCLI.java       |  98 ++++++++
 .../Flip6StandaloneClusterDescriptor.java       |  63 ++++++
 .../flink/client/program/ClusterClient.java     |  77 ++++---
 .../client/program/rest/RestClusterClient.java  | 221 +++++++++++++++++++
 .../rest/RestClusterClientConfiguration.java    |  78 +++++++
 .../flink/client/CliFrontendListCancelTest.java | 123 ++++++-----
 .../apache/flink/client/CliFrontendRunTest.java |   9 +
 .../flink/client/CliFrontendStopTest.java       | 118 ++++------
 .../flink/client/program/ClusterClientTest.java | 143 ++++++++++++
 .../program/rest/RestClusterClientTest.java     | 192 ++++++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java    |   5 +
 .../runtime/dispatcher/DispatcherGateway.java   |   8 +
 .../dispatcher/DispatcherRestEndpoint.java      |   8 +
 .../flink/runtime/rest/RestServerEndpoint.java  |   4 +-
 .../rest/handler/job/BlobServerPortHandler.java |  61 +++++
 .../rest/handler/job/JobSubmitHandler.java      |  66 ++++++
 .../rest/messages/BlobServerPortHeaders.java    |  69 ++++++
 .../messages/BlobServerPortResponseBody.java    |  57 +++++
 .../JobTerminationMessageParameters.java        |   4 +-
 .../rest/messages/job/JobSubmitHeaders.java     |  71 ++++++
 .../rest/messages/job/JobSubmitRequestBody.java |  88 ++++++++
 .../messages/job/JobSubmitResponseBody.java     |  61 +++++
 .../handler/job/BlobServerPortHandlerTest.java  |  97 ++++++++
 .../rest/handler/job/JobSubmitHandlerTest.java  |  87 ++++++++
 .../messages/BlobServerPortResponseTest.java    |  37 ++++
 .../rest/messages/JobSubmitRequestBodyTest.java |  41 ++++
 .../messages/JobSubmitResponseBodyTest.java     |  38 ++++
 28 files changed, 1790 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 9d1f52e..9be8295 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
 import org.apache.flink.client.cli.InfoOptions;
 import org.apache.flink.client.cli.ListOptions;
 import org.apache.flink.client.cli.ProgramOptions;
@@ -59,13 +60,7 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -146,6 +141,8 @@ public class CliFrontend {
 		} catch (Exception e) {
 			LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e);
 		}
+
+		customCommandLines.add(new Flip6DefaultCLI());
 		customCommandLines.add(new DefaultCLI());
 	}
 
@@ -555,17 +552,18 @@ public class CliFrontend {
 		}
 
 		try {
-			ActorGateway jobManager = getJobManagerGateway(options);
-			Future<Object> response = jobManager.ask(new StopJob(jobId), clientTimeout);
-
-			final Object rc = Await.result(response, clientTimeout);
+			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
+			try {
+				logAndSysout("Stopping job " + jobId + '.');
+				client.stop(jobId);
+				logAndSysout("Stopped job " + jobId + '.');
 
-			if (rc instanceof StoppingFailure) {
-				throw new Exception("Stopping the job with ID " + jobId + " failed.",
-						((StoppingFailure) rc).cause());
+				return 0;
+			} finally {
+				client.shutdown();
 			}
 
-			return 0;
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -636,40 +634,27 @@ public class CliFrontend {
 		}
 
 		try {
-			ActorGateway jobManager = getJobManagerGateway(options);
-
-			Object cancelMsg;
-			if (withSavepoint) {
-				if (targetDirectory == null) {
-					logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory.");
-				} else {
-					logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + ".");
-				}
-				cancelMsg = new CancelJobWithSavepoint(jobId, targetDirectory);
-			} else {
-				logAndSysout("Cancelling job " + jobId + ".");
-				cancelMsg = new CancelJob(jobId);
-			}
-
-			Future<Object> response = jobManager.ask(cancelMsg, clientTimeout);
-			final Object rc = Await.result(response, clientTimeout);
-
-			if (rc instanceof CancellationSuccess) {
+			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
+			try {
 				if (withSavepoint) {
-					CancellationSuccess success = (CancellationSuccess) rc;
-					String savepointPath = success.savepointPath();
-					logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + ".");
+					if (targetDirectory == null) {
+						logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory.");
+					} else {
+						logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.');
+					}
+					String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory);
+					logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.');
 				} else {
-					logAndSysout("Cancelled job " + jobId + ".");
+					logAndSysout("Cancelling job " + jobId + '.');
+					client.cancel(jobId);
+					logAndSysout("Cancelled job " + jobId + '.');
 				}
-			} else if (rc instanceof CancellationFailure) {
-				throw new Exception("Canceling the job with ID " + jobId + " failed.",
-						((CancellationFailure) rc).cause());
-			} else {
-				throw new IllegalStateException("Unexpected response: " + rc);
-			}
 
-			return 0;
+				return 0;
+			} finally {
+				client.shutdown();
+			}
 		}
 		catch (Throwable t) {
 			return handleError(t);
@@ -978,7 +963,11 @@ public class CliFrontend {
 		// Avoid resolving the JobManager Gateway here to prevent blocking until we invoke the user's program.
 		final InetSocketAddress jobManagerAddress = client.getJobManagerAddress();
 		logAndSysout("Using address " + jobManagerAddress.getHostString() + ":" + jobManagerAddress.getPort() + " to connect to JobManager.");
-		logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
+		try {
+			logAndSysout("JobManager web interface address " + client.getWebInterfaceURL());
+		} catch (UnsupportedOperationException uoe) {
+			logAndSysout("JobManager web interface not active.");
+		}
 		return client;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
new file mode 100644
index 0000000..5fb9dfc
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+
+import static org.apache.flink.client.CliFrontend.setJobManagerAddressInConfig;
+
+/**
+ * The default CLI which is used for interaction with standalone clusters.
+ */
+public class Flip6DefaultCLI implements CustomCommandLine<RestClusterClient> {
+
+	public static final Option FLIP_6 = new Option("flip6", "Switches the client to Flip-6 mode.");
+
+	static {
+		FLIP_6.setRequired(false);
+	}
+
+	@Override
+	public boolean isActive(CommandLine commandLine, Configuration configuration) {
+		return commandLine.hasOption(FLIP_6.getOpt());
+	}
+
+	@Override
+	public String getId() {
+		return "flip6";
+	}
+
+	@Override
+	public void addRunOptions(Options baseOptions) {
+	}
+
+	@Override
+	public void addGeneralOptions(Options baseOptions) {
+		baseOptions.addOption(FLIP_6);
+	}
+
+	@Override
+	public RestClusterClient retrieveCluster(CommandLine commandLine, Configuration config, String configurationDirectory) {
+		if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
+			String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
+			InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(addressWithPort);
+			setJobManagerAddressInConfig(config, jobManagerAddress);
+		}
+
+		if (commandLine.hasOption(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt())) {
+			String zkNamespace = commandLine.getOptionValue(CliFrontendParser.ZOOKEEPER_NAMESPACE_OPTION.getOpt());
+			config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
+		}
+
+		Flip6StandaloneClusterDescriptor descriptor = new Flip6StandaloneClusterDescriptor(config);
+		return descriptor.retrieve(null);
+	}
+
+	@Override
+	public RestClusterClient createCluster(
+			String applicationName,
+			CommandLine commandLine,
+			Configuration config,
+			String configurationDirectory,
+			List<URL> userJarFiles) throws UnsupportedOperationException {
+
+		Flip6StandaloneClusterDescriptor descriptor = new Flip6StandaloneClusterDescriptor(config);
+		ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config);
+
+		return descriptor.deploySessionCluster(clusterSpecification);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
new file mode 100644
index 0000000..9d88f59
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<RestClusterClient> {
+
+	private final Configuration config;
+
+	public Flip6StandaloneClusterDescriptor(Configuration config) {
+		this.config = Preconditions.checkNotNull(config);
+	}
+
+	@Override
+	public String getClusterDescription() {
+		String host = config.getString(JobManagerOptions.ADDRESS, "");
+		int port = config.getInteger(JobManagerOptions.PORT, -1);
+		return "FLIP-6 Standalone cluster at " + host + ":" + port;
+	}
+
+	@Override
+	public RestClusterClient retrieve(String applicationID) {
+		try {
+			return new RestClusterClient(config);
+		} catch (Exception e) {
+			throw new RuntimeException("Couldn't retrieve FLIP-6 standalone cluster", e);
+		}
+	}
+
+	@Override
+	public RestClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
+		throw new UnsupportedOperationException("Can't deploy a FLIP-6 standalone cluster.");
+	}
+
+	@Override
+	public RestClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
+		throw new UnsupportedOperationException("Can't deploy a standalone FLIP-6 per-job cluster.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c8a236e..78455c1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -63,6 +63,8 @@ import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -82,7 +84,7 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public abstract class ClusterClient {
 
-	private final Logger log = LoggerFactory.getLogger(getClass());
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	/** The optimizer used in the optimization of batch programs. */
 	final Optimizer compiler;
@@ -575,25 +577,46 @@ public abstract class ClusterClient {
 	 * @throws Exception In case an error occurred.
 	 */
 	public void cancel(JobID jobId) throws Exception {
-		final ActorGateway jobManagerGateway = getJobManagerGateway();
+		final ActorGateway jobManager = getJobManagerGateway();
 
-		final Future<Object> response;
-		try {
-			response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout);
-		} catch (final Exception e) {
-			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
+		Object cancelMsg = new JobManagerMessages.CancelJob(jobId);
+
+		Future<Object> response = jobManager.ask(cancelMsg, timeout);
+		final Object rc = Await.result(response, timeout);
+
+		if (rc instanceof JobManagerMessages.CancellationSuccess) {
+			// no further action required
+		} else if (rc instanceof JobManagerMessages.CancellationFailure) {
+			throw new Exception("Canceling the job with ID " + jobId + " failed.",
+				((JobManagerMessages.CancellationFailure) rc).cause());
+		} else {
+			throw new IllegalStateException("Unexpected response: " + rc);
 		}
+	}
 
-		final Object result = Await.result(response, timeout);
+	/**
+	 * Cancels a job identified by the job id and triggers a savepoint.
+	 * @param jobId the job id
+	 * @param savepointDirectory directory the savepoint should be written to
+	 * @return path where the savepoint is located
+	 * @throws Exception In case an error cocurred.
+	 */
+	public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
+		final ActorGateway jobManager = getJobManagerGateway();
+
+		Object cancelMsg = new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointDirectory);
 
-		if (result instanceof JobManagerMessages.CancellationSuccess) {
-			logAndSysout("Job cancellation with ID " + jobId + " succeeded.");
-		} else if (result instanceof JobManagerMessages.CancellationFailure) {
-			final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause();
-			logAndSysout("Job cancellation with ID " + jobId + " failed because of " + t.getMessage());
-			throw new Exception("Failed to cancel the job with id " + jobId, t);
+		Future<Object> response = jobManager.ask(cancelMsg, timeout);
+		final Object rc = Await.result(response, timeout);
+
+		if (rc instanceof JobManagerMessages.CancellationSuccess) {
+			JobManagerMessages.CancellationSuccess success = (JobManagerMessages.CancellationSuccess) rc;
+			return success.savepointPath();
+		} else if (rc instanceof JobManagerMessages.CancellationFailure) {
+			throw new Exception("Cancel & savepoint for the job with ID " + jobId + " failed.",
+				((JobManagerMessages.CancellationFailure) rc).cause());
 		} else {
-			throw new Exception("Unknown message received while cancelling: " + result.getClass().getName());
+			throw new IllegalStateException("Unexpected response: " + rc);
 		}
 	}
 
@@ -610,25 +633,19 @@ public abstract class ClusterClient {
 	 *             failed. That might be due to an I/O problem, ie, the job-manager is unreachable.
 	 */
 	public void stop(final JobID jobId) throws Exception {
-		final ActorGateway jobManagerGateway = getJobManagerGateway();
+		final ActorGateway jobManager = getJobManagerGateway();
 
-		final Future<Object> response;
-		try {
-			response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout);
-		} catch (final Exception e) {
-			throw new ProgramInvocationException("Failed to query the job manager gateway.", e);
-		}
+		Future<Object> response = jobManager.ask(new JobManagerMessages.StopJob(jobId), timeout);
 
-		final Object result = Await.result(response, timeout);
+		final Object rc = Await.result(response, timeout);
 
-		if (result instanceof JobManagerMessages.StoppingSuccess) {
-			log.info("Job stopping with ID " + jobId + " succeeded.");
-		} else if (result instanceof JobManagerMessages.StoppingFailure) {
-			final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause();
-			log.info("Job stopping with ID " + jobId + " failed.", t);
-			throw new Exception("Failed to stop the job because of \n" + t.getMessage());
+		if (rc instanceof JobManagerMessages.StoppingSuccess) {
+			// no further action required
+		} else if (rc instanceof JobManagerMessages.StoppingFailure) {
+			throw new Exception("Stopping the job with ID " + jobId + " failed.",
+				((JobManagerMessages.StoppingFailure) rc).cause());
 		} else {
-			throw new Exception("Unknown message received while stopping: " + result.getClass().getName());
+			throw new IllegalStateException("Unexpected response: " + rc);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
new file mode 100644
index 0000000..a37ee63
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+	private final RestClusterClientConfiguration restClusterClientConfiguration;
+	private final RestClient restClient;
+	private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
+
+	public RestClusterClient(Configuration config) throws Exception {
+		this(config, RestClusterClientConfiguration.fromConfiguration(config));
+	}
+
+	public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception {
+		super(config);
+		this.restClusterClientConfiguration = configuration;
+		this.restClient = new RestClient(configuration.getRestClientConfiguration(), executorService);
+	}
+
+	@Override
+	public void shutdown() {
+		try {
+			// we only call this for legacy reasons to shutdown components that are started in the ClusterClient constructor
+			super.shutdown();
+		} catch (Exception e) {
+			log.error("An error occurred during the client shutdown.", e);
+		}
+		this.restClient.shutdown(Time.seconds(5));
+		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);
+	}
+
+	@Override
+	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
+		log.info("Submitting job.");
+		try {
+			// temporary hack for FLIP-6 since slot-sharing isn't implemented yet
+			jobGraph.setAllowQueuedScheduling(true);
+			submitJob(jobGraph);
+		} catch (JobSubmissionException e) {
+			throw new ProgramInvocationException(e);
+		}
+		// don't return just a JobSubmissionResult here, the signature is lying
+		// The CliFrontend expects this to be a JobExecutionResult
+
+		// TOOD: do not exit this method until job is finished
+		return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap());
+	}
+
+	private void submitJob(JobGraph jobGraph) throws JobSubmissionException {
+		log.info("Requesting blob server port.");
+		int blobServerPort;
+		try {
+			CompletableFuture<BlobServerPortResponseBody> portFuture = restClient.sendRequest(
+				restClusterClientConfiguration.getRestServerAddress(),
+				restClusterClientConfiguration.getRestServerPort(),
+				BlobServerPortHeaders.getInstance());
+			blobServerPort = portFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS).port;
+		} catch (Exception e) {
+			throw new JobSubmissionException(jobGraph.getJobID(), "Failed to retrieve blob server port.", e);
+		}
+
+		log.info("Uploading jar files.");
+		try {
+			InetSocketAddress address = new InetSocketAddress(restClusterClientConfiguration.getBlobServerAddress(), blobServerPort);
+			List<PermanentBlobKey> keys = BlobClient.uploadJarFiles(address, this.flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+			for (PermanentBlobKey key : keys) {
+				jobGraph.addBlob(key);
+			}
+		} catch (Exception e) {
+			throw new JobSubmissionException(jobGraph.getJobID(), "Failed to upload user jars to blob server.", e);
+		}
+
+		log.info("Submitting job graph.");
+		try {
+			CompletableFuture<JobSubmitResponseBody> responseFuture = restClient.sendRequest(
+				restClusterClientConfiguration.getRestServerAddress(),
+				restClusterClientConfiguration.getRestServerPort(),
+				JobSubmitHeaders.getInstance(),
+				new JobSubmitRequestBody(jobGraph));
+			responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+		} catch (Exception e) {
+			throw new JobSubmissionException(jobGraph.getJobID(), "Failed to submit JobGraph.", e);
+		}
+	}
+
+	@Override
+	public void stop(JobID jobID) throws Exception {
+		JobTerminationMessageParameters params = new JobTerminationMessageParameters();
+		params.jobPathParameter.resolve(jobID);
+		params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.STOP));
+		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+			restClusterClientConfiguration.getRestServerAddress(),
+			restClusterClientConfiguration.getRestServerPort(),
+			JobTerminationHeaders.getInstance(),
+			params
+		);
+		responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public void cancel(JobID jobID) throws Exception {
+		JobTerminationMessageParameters params = new JobTerminationMessageParameters();
+		params.jobPathParameter.resolve(jobID);
+		params.terminationModeQueryParameter.resolve(Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL));
+		CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest(
+			restClusterClientConfiguration.getRestServerAddress(),
+			restClusterClientConfiguration.getRestServerPort(),
+			JobTerminationHeaders.getInstance(),
+			params
+		);
+		responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory) throws Exception {
+		throw new UnsupportedOperationException();
+	}
+
+	// ======================================
+	// Legacy stuff we actually implement
+	// ======================================
+
+	@Override
+	public String getClusterIdentifier() {
+		return "Flip-6 Standalone cluster with dispatcher at " + restClusterClientConfiguration.getRestServerAddress() + '.';
+	}
+
+	@Override
+	public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+		return false;
+	}
+
+	// ======================================
+	// Legacy stuff we ignore
+	// ======================================
+
+	@Override
+	public void waitForClusterToBeReady() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public String getWebInterfaceURL() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public GetClusterStatusResponse getClusterStatus() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	protected List<String> getNewMessages() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	protected void finalizeCluster() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getMaxSlots() {
+		return 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
new file mode 100644
index 0000000..788eba9
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A configuration object for {@link RestClusterClient}s.
+ */
+public final class RestClusterClientConfiguration {
+
+	private final String blobServerAddress;
+
+	private final RestClientConfiguration restClientConfiguration;
+
+	private final String restServerAddress;
+
+	private final int restServerPort;
+
+	private RestClusterClientConfiguration(
+			String blobServerAddress,
+			RestClientConfiguration endpointConfiguration,
+			String restServerAddress,
+			int restServerPort) {
+		this.blobServerAddress = Preconditions.checkNotNull(blobServerAddress);
+		this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration);
+		this.restServerAddress = Preconditions.checkNotNull(restServerAddress);
+		this.restServerPort = restServerPort;
+	}
+
+	public String getBlobServerAddress() {
+		return blobServerAddress;
+	}
+
+	public String getRestServerAddress() {
+		return restServerAddress;
+	}
+
+	public int getRestServerPort() {
+		return restServerPort;
+	}
+
+	public RestClientConfiguration getRestClientConfiguration() {
+		return restClientConfiguration;
+	}
+
+	public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
+		String blobServerAddress = config.getString(JobManagerOptions.ADDRESS);
+
+		String serverAddress = config.getString(RestOptions.REST_ADDRESS);
+		int serverPort = config.getInteger(RestOptions.REST_PORT);
+
+		RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config);
+
+		return new RestClusterClientConfiguration(blobServerAddress, restClientConfiguration, serverAddress, serverPort);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 725d95a..e52dde1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -19,7 +19,13 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -30,9 +36,11 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.testkit.JavaTestKit;
+import org.apache.commons.cli.CommandLine;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.UUID;
 
@@ -41,6 +49,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the CANCEL and LIST commands.
@@ -88,47 +104,35 @@ public class CliFrontendListCancelTest {
 			// test cancel properly
 			{
 				JobID jid = new JobID();
-				String jidString = jid.toString();
 
-				final UUID leaderSessionID = UUID.randomUUID();
-
-				final ActorRef jm = actorSystem.actorOf(Props.create(
-								CliJobManager.class,
-								jid,
-								leaderSessionID
-						)
-				);
-
-				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-
-				String[] parameters = { jidString };
-				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+				String[] parameters = { jid.toString() };
+				CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
 
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode == 0);
+
+				Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
 			}
 
 			// test cancel properly
 			{
-				JobID jid1 = new JobID();
-				JobID jid2 = new JobID();
-
-				final UUID leaderSessionID = UUID.randomUUID();
+				JobID jid = new JobID();
 
-				final ActorRef jm = actorSystem.actorOf(
-						Props.create(
-								CliJobManager.class,
-								jid1,
-								leaderSessionID
-						)
-				);
+				String[] parameters = { jid.toString() };
+				CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true);
 
-				final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+				int retCode = testFrontend.cancel(parameters);
+				assertTrue(retCode != 0);
 
-				String[] parameters = { jid2.toString() };
-				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+				Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
+			}
 
-				assertTrue(testFrontend.cancel(parameters) != 0);
+			// test flip6 switch
+			{
+				String[] parameters =
+					{"-flip6", String.valueOf(new JobID())};
+				CancelOptions options = CliFrontendParser.parseCancelCommand(parameters);
+				assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
 			}
 		}
 		catch (Exception e) {
@@ -145,56 +149,38 @@ public class CliFrontendListCancelTest {
 		{
 			// Cancel with savepoint (no target directory)
 			JobID jid = new JobID();
-			UUID leaderSessionID = UUID.randomUUID();
-
-			Props props = Props.create(CliJobManager.class, jid, leaderSessionID);
-			ActorRef jm = actorSystem.actorOf(props);
-			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
 
 			String[] parameters = { "-s", jid.toString() };
-			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
 			assertEquals(0, testFrontend.cancel(parameters));
+
+			Mockito.verify(testFrontend.client, times(1))
+				.cancelWithSavepoint(any(JobID.class), isNull(String.class));
 		}
 
 		{
 			// Cancel with savepoint (with target directory)
 			JobID jid = new JobID();
-			UUID leaderSessionID = UUID.randomUUID();
-
-			Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory");
-			ActorRef jm = actorSystem.actorOf(props);
-			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
 
 			String[] parameters = { "-s", "targetDirectory", jid.toString() };
-			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
 			assertEquals(0, testFrontend.cancel(parameters));
+
+			Mockito.verify(testFrontend.client, times(1))
+				.cancelWithSavepoint(any(JobID.class), notNull(String.class));
 		}
 
 		{
 			// Cancel with savepoint (with target directory), but no job ID
-			JobID jid = new JobID();
-			UUID leaderSessionID = UUID.randomUUID();
-
-			Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory");
-			ActorRef jm = actorSystem.actorOf(props);
-			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-
 			String[] parameters = { "-s", "targetDirectory" };
-			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 			assertNotEquals(0, testFrontend.cancel(parameters));
 		}
 
 		{
 			// Cancel with savepoint (no target directory) and no job ID
-			JobID jid = new JobID();
-			UUID leaderSessionID = UUID.randomUUID();
-
-			Props props = Props.create(CliJobManager.class, jid, leaderSessionID);
-			ActorRef jm = actorSystem.actorOf(props);
-			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-
 			String[] parameters = { "-s" };
-			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 			assertNotEquals(0, testFrontend.cancel(parameters));
 		}
 	}
@@ -234,11 +220,32 @@ public class CliFrontendListCancelTest {
 		}
 	}
 
+	private static final class CancelTestCliFrontend extends CliFrontend {
+		private final ClusterClient client;
+
+		CancelTestCliFrontend(boolean reject) throws Exception {
+			super(CliFrontendTestUtils.getConfigDir());
+			this.client = mock(ClusterClient.class);
+			if (reject) {
+				doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
+				doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
+			}
+		}
+
+		@Override
+		public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+			CustomCommandLine ccl = mock(CustomCommandLine.class);
+			when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
+				.thenReturn(client);
+			return ccl;
+		}
+	}
+
 	private static final class InfoListTestCliFrontend extends CliFrontend {
 
 		private ActorGateway jobManagerGateway;
 
-		public InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
+		InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
 			super(CliFrontendTestUtils.getConfigDir());
 			this.jobManagerGateway = jobManagerGateway;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index e453d37..0edc444 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client;
 
 import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -128,6 +129,14 @@ public class CliFrontendRunTest {
 				assertEquals("--arg2", options.getProgramArgs()[3]);
 				assertEquals("value2", options.getProgramArgs()[4]);
 			}
+
+			// test flip6 switch
+			{
+				String[] parameters =
+					{"-flip6", getTestJarPath()};
+				RunOptions options = CliFrontendParser.parseRunCommand(parameters);
+				assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index fef4880..ab81713 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -19,44 +19,37 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
+import org.apache.flink.client.cli.StopOptions;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
+import org.apache.commons.cli.CommandLine;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
-import java.util.UUID;
+import org.mockito.Mockito;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests for the STOP command.
  */
 public class CliFrontendStopTest extends TestLogger {
 
-	private static ActorSystem actorSystem;
-
 	@BeforeClass
 	public static void setup() {
 		pipeSystemOutToNull();
-		actorSystem = ActorSystem.create("TestingActorSystem");
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(actorSystem);
-		actorSystem = null;
 	}
 
 	@Test
@@ -82,82 +75,53 @@ public class CliFrontendStopTest extends TestLogger {
 			JobID jid = new JobID();
 			String jidString = jid.toString();
 
-			final UUID leaderSessionID = UUID.randomUUID();
-			final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid, leaderSessionID));
-
-			final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
-
 			String[] parameters = { jidString };
-			StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
+			StopTestCliFrontend testFrontend = new StopTestCliFrontend(false);
 
 			int retCode = testFrontend.stop(parameters);
-			assertTrue(retCode == 0);
+			assertEquals(0, retCode);
+
+			Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class));
 		}
 
 		// test unknown job Id
 		{
-			JobID jid1 = new JobID();
-			JobID jid2 = new JobID();
-
-			final UUID leaderSessionID = UUID.randomUUID();
-			final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1, leaderSessionID));
-
-			final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+			JobID jid = new JobID();
 
-			String[] parameters = { jid2.toString() };
-			StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
+			String[] parameters = { jid.toString() };
+			StopTestCliFrontend testFrontend = new StopTestCliFrontend(true);
 
 			assertTrue(testFrontend.stop(parameters) != 0);
-		}
-	}
 
-	private static final class StopTestCliFrontend extends CliFrontend {
-
-		private ActorGateway jobManagerGateway;
-
-		public StopTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
-			super(CliFrontendTestUtils.getConfigDir());
-			this.jobManagerGateway = jobManagerGateway;
+			Mockito.verify(testFrontend.client, times(1)).stop(any(JobID.class));
 		}
 
-		@Override
-		public ActorGateway getJobManagerGateway(CommandLineOptions options) {
-			return jobManagerGateway;
+		// test flip6 switch
+		{
+			String[] parameters =
+				{"-flip6", String.valueOf(new JobID())};
+			StopOptions options = CliFrontendParser.parseStopCommand(parameters);
+			assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
 		}
 	}
 
-	private static final class CliJobManager extends FlinkUntypedActor {
-		private final JobID jobID;
-		private final UUID leaderSessionID;
-
-		public CliJobManager(final JobID jobID, final UUID leaderSessionID) {
-			this.jobID = jobID;
-			this.leaderSessionID = leaderSessionID;
-		}
+	private static final class StopTestCliFrontend extends CliFrontend {
+		private final ClusterClient client;
 
-		@Override
-		public void handleMessage(Object message) {
-			if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) {
-				getSender().tell(decorateMessage(1), getSelf());
-			} else if (message instanceof JobManagerMessages.StopJob) {
-				JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message;
-
-				if (jobID != null && jobID.equals(stopJob.jobID())) {
-					getSender().tell(decorateMessage(new Status.Success(new Object())), getSelf());
-				} else {
-					getSender()
-							.tell(decorateMessage(new Status.Failure(new Exception(
-									"Wrong or no JobID"))), getSelf());
-				}
-			} else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
-				getSender().tell(decorateMessage(new JobManagerMessages.RunningJobsStatus()),
-						getSelf());
+		StopTestCliFrontend(boolean reject) throws Exception {
+			super(CliFrontendTestUtils.getConfigDir());
+			this.client = mock(ClusterClient.class);
+			if (reject) {
+				doThrow(new IllegalArgumentException("Test exception")).when(client).stop(any(JobID.class));
 			}
 		}
 
 		@Override
-		protected UUID getLeaderSessionID() {
-			return leaderSessionID;
+		public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
+			CustomCommandLine ccl = mock(CustomCommandLine.class);
+			when(ccl.retrieveCluster(any(CommandLine.class), any(Configuration.class), anyString()))
+				.thenReturn(client);
+			return ccl;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 97a881c..98c7d26 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -18,12 +18,22 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assert;
 import org.junit.Test;
 
+import scala.concurrent.Future;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -51,4 +61,137 @@ public class ClusterClientTest extends TestLogger {
 		verify(highAvailabilityServices, never()).closeAndCleanupAllData();
 		verify(highAvailabilityServices).close();
 	}
+
+	@Test
+	public void testClusterClientStop() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		JobID jobID = new JobID();
+		TestStopActorGateway gateway = new TestStopActorGateway(jobID);
+		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		try {
+			clusterClient.stop(jobID);
+			Assert.assertTrue(gateway.messageArrived);
+		} finally {
+			clusterClient.shutdown();
+		}
+	}
+
+	@Test
+	public void testClusterClientCancel() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		JobID jobID = new JobID();
+		TestCancelActorGateway gateway = new TestCancelActorGateway(jobID);
+		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		try {
+			clusterClient.cancel(jobID);
+			Assert.assertTrue(gateway.messageArrived);
+		} finally {
+			clusterClient.shutdown();
+		}
+	}
+
+	@Test
+	public void testClusterClientCancelWithSavepoint() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		JobID jobID = new JobID();
+		String savepointPath = "/test/path";
+		TestCancelWithSavepointActorGateway gateway = new TestCancelWithSavepointActorGateway(jobID, savepointPath);
+		ClusterClient clusterClient = new TestClusterClient(config, gateway);
+		try {
+			clusterClient.cancelWithSavepoint(jobID, savepointPath);
+			Assert.assertTrue(gateway.messageArrived);
+		} finally {
+			clusterClient.shutdown();
+		}
+	}
+
+	private static class TestStopActorGateway extends DummyActorGateway {
+
+		private final JobID expectedJobID;
+		private volatile boolean messageArrived = false;
+
+		TestStopActorGateway(JobID expectedJobID) {
+			this.expectedJobID = expectedJobID;
+		}
+
+		@Override
+		public Future<Object> ask(Object message, FiniteDuration timeout) {
+			messageArrived = true;
+			if (message instanceof JobManagerMessages.StopJob) {
+				JobManagerMessages.StopJob stopJob = (JobManagerMessages.StopJob) message;
+				Assert.assertEquals(expectedJobID, stopJob.jobID());
+				return Future$.MODULE$.successful(new JobManagerMessages.StoppingSuccess(stopJob.jobID()));
+			}
+			Assert.fail("Expected StopJob message, got: " + message.getClass());
+			return null;
+		}
+	}
+
+	private static class TestCancelActorGateway extends DummyActorGateway {
+
+		private final JobID expectedJobID;
+		private volatile boolean messageArrived = false;
+
+		TestCancelActorGateway(JobID expectedJobID) {
+			this.expectedJobID = expectedJobID;
+		}
+
+		@Override
+		public Future<Object> ask(Object message, FiniteDuration timeout) {
+			messageArrived = true;
+			if (message instanceof JobManagerMessages.CancelJob) {
+				JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
+				Assert.assertEquals(expectedJobID, cancelJob.jobID());
+				return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
+			}
+			Assert.fail("Expected CancelJob message, got: " + message.getClass());
+			return null;
+		}
+	}
+
+	private static class TestCancelWithSavepointActorGateway extends DummyActorGateway {
+
+		private final JobID expectedJobID;
+		private final String expectedTargetDirectory;
+		private volatile boolean messageArrived = false;
+
+		TestCancelWithSavepointActorGateway(JobID expectedJobID, String expectedTargetDirectory) {
+			this.expectedJobID = expectedJobID;
+			this.expectedTargetDirectory = expectedTargetDirectory;
+		}
+
+		@Override
+		public Future<Object> ask(Object message, FiniteDuration timeout) {
+			messageArrived = true;
+			if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
+				JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
+				Assert.assertEquals(expectedJobID, cancelJob.jobID());
+				Assert.assertEquals(expectedTargetDirectory, cancelJob.savepointDirectory());
+				return Future$.MODULE$.successful(new JobManagerMessages.CancellationSuccess(cancelJob.jobID(), null));
+			}
+			Assert.fail("Expected CancelJobWithSavepoint message, got: " + message.getClass());
+			return null;
+		}
+	}
+
+	private static class TestClusterClient extends StandaloneClusterClient {
+
+		private final ActorGateway jobmanagerGateway;
+
+		public TestClusterClient(Configuration config, ActorGateway jobmanagerGateway) throws Exception {
+			super(config);
+			this.jobmanagerGateway = jobmanagerGateway;
+		}
+
+		@Override
+		public ActorGateway getJobManagerGateway() {
+			return jobmanagerGateway;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
new file mode 100644
index 0000000..617dd38
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest extends TestLogger {
+
+	private static final String restAddress = "http://localhost:1234";
+	private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class);
+	private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever = mock(GatewayRetriever.class);
+
+	static {
+		when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+		when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+	}
+
+	@Test
+	public void testJobSubmitCancelStop() throws Exception {
+
+		Configuration config = new Configuration();
+		config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+		RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+		TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler();
+		TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+		TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler();
+
+		RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+			@Override
+			protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+				Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+				handlers.add(Tuple2.of(portHandler.getMessageHeaders(), portHandler));
+				handlers.add(Tuple2.of(submitHandler.getMessageHeaders(), submitHandler));
+				handlers.add(Tuple2.of(terminationHandler.getMessageHeaders(), terminationHandler));
+				return handlers;
+			}
+		};
+
+		RestClusterClient rcc = new RestClusterClient(config);
+		try {
+			rse.start();
+
+			JobGraph job = new JobGraph("testjob");
+			JobID id = job.getJobID();
+
+			Assert.assertFalse(portHandler.portRetrieved);
+			Assert.assertFalse(submitHandler.jobSubmitted);
+			rcc.submitJob(job, ClassLoader.getSystemClassLoader());
+			Assert.assertTrue(portHandler.portRetrieved);
+			Assert.assertTrue(submitHandler.jobSubmitted);
+
+			Assert.assertFalse(terminationHandler.jobCanceled);
+			rcc.cancel(id);
+			Assert.assertTrue(terminationHandler.jobCanceled);
+
+			Assert.assertFalse(terminationHandler.jobStopped);
+			rcc.stop(id);
+			Assert.assertTrue(terminationHandler.jobStopped);
+
+		} finally {
+			rcc.shutdown();
+			rse.shutdown(Time.seconds(5));
+		}
+	}
+
+	private static class TestBlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
+		private volatile boolean portRetrieved = false;
+
+		private TestBlobServerPortHandler() {
+			super(
+				CompletableFuture.completedFuture(restAddress),
+				mockGatewayRetriever,
+				RpcUtils.INF_TIMEOUT,
+				BlobServerPortHeaders.getInstance());
+		}
+
+		@Override
+		protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			portRetrieved = true;
+			return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000));
+		}
+	}
+
+	private static class TestJobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+		private volatile boolean jobSubmitted = false;
+
+		private TestJobSubmitHandler() {
+			super(
+				CompletableFuture.completedFuture(restAddress),
+				mockGatewayRetriever,
+				RpcUtils.INF_TIMEOUT,
+				JobSubmitHeaders.getInstance());
+		}
+
+		@Override
+		protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			jobSubmitted = true;
+			return CompletableFuture.completedFuture(new JobSubmitResponseBody("/url"));
+		}
+	}
+
+	private static class TestJobTerminationHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> {
+		private volatile boolean jobCanceled = false;
+		private volatile boolean jobStopped = false;
+
+		private TestJobTerminationHandler() {
+			super(
+				CompletableFuture.completedFuture(restAddress),
+				mockGatewayRetriever,
+				RpcUtils.INF_TIMEOUT,
+				JobTerminationHeaders.getInstance());
+		}
+
+		@Override
+		protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobTerminationMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			switch (request.getQueryParameter(TerminationModeQueryParameter.class).get(0)) {
+				case CANCEL:
+					jobCanceled = true;
+					break;
+				case STOP:
+					jobStopped = true;
+					break;
+			}
+			return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4d89dc8..bce2bed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -312,6 +312,11 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 	}
 
+	@Override
+	public CompletableFuture<Integer> getBlobServerPort(Time timeout) {
+		return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index fe7b91e..12cbbfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -71,4 +71,12 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	 * @return A future acknowledge if the stopping succeeded
 	 */
 	CompletableFuture<Acknowledge> stopJob(JobID jobId, @RpcTimeout Time timeout);
+
+	/**
+	 * Returns the port of the blob server.
+	 *
+	 * @param timeout of the operation
+	 * @return A future integer of the blob server port
+	 */
+	CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8471078..d64e649 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -27,7 +27,9 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
@@ -192,6 +194,12 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
 		handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
 
+		BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
+		handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
+
+		JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(restAddressFuture, leaderRetriever, timeout);
+		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
+
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index d09aad9..18766c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -54,6 +54,8 @@ import java.util.concurrent.TimeUnit;
  * An abstract class for netty-based REST server endpoints.
  */
 public abstract class RestServerEndpoint {
+
+	public static final int MAX_REQUEST_SIZE_BYTES = 1024 * 1024 * 10;
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	private final Object lock = new Object();
@@ -120,7 +122,7 @@ public abstract class RestServerEndpoint {
 
 					ch.pipeline()
 						.addLast(new HttpServerCodec())
-						.addLast(new HttpObjectAggregator(1024 * 1024 * 10))
+						.addLast(new HttpObjectAggregator(MAX_REQUEST_SIZE_BYTES))
 						.addLast(handler.name(), handler)
 						.addLast(new PipelineErrorHandler(log));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
new file mode 100644
index 0000000..cdf562f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * This handler can be used to retrieve the port that the blob server runs on.
+ */
+public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
+
+	public BlobServerPortHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout) {
+		super(localRestAddress, leaderRetriever, timeout, BlobServerPortHeaders.getInstance());
+	}
+
+	@Override
+	protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+		return gateway
+			.getBlobServerPort(timeout)
+			.thenApply(BlobServerPortResponseBody::new)
+			.exceptionally(error -> {
+				throw new CompletionException(new RestHandlerException(
+					"Failed to retrieve blob server port.",
+					HttpResponseStatus.INTERNAL_SERVER_ERROR,
+					ExceptionUtils.stripCompletionException(error)));
+			});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
new file mode 100644
index 0000000..f810b5a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler can be used to submit jobs to a Flink cluster.
+ */
+public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+
+	public JobSubmitHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout) {
+		super(localRestAddress, leaderRetriever, timeout, JobSubmitHeaders.getInstance());
+	}
+
+	@Override
+	protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+		JobGraph jobGraph;
+		try {
+			ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+			jobGraph = (JobGraph) objectIn.readObject();
+		} catch (Exception e) {
+			throw new RestHandlerException(
+				"Failed to deserialize JobGraph.",
+				HttpResponseStatus.BAD_REQUEST,
+				e);
+		}
+
+		return gateway.submitJob(jobGraph, timeout)
+			.thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
new file mode 100644
index 0000000..8edec16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for querying the port of the blob server.
+ */
+public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> {
+
+	private static final String URL = "/blobserver/port";
+	private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders();
+
+	private BlobServerPortHeaders() {
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	@Override
+	public Class<BlobServerPortResponseBody> getResponseClass() {
+		return BlobServerPortResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static BlobServerPortHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
new file mode 100644
index 0000000..846475f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Response containing the blob server port.
+ */
+public final class BlobServerPortResponseBody implements ResponseBody {
+
+	static final String FIELD_NAME_PORT = "port";
+
+	/**
+	 * The port of the blob server.
+	 */
+	@JsonProperty(FIELD_NAME_PORT)
+	public final int port;
+
+	@JsonCreator
+	public BlobServerPortResponseBody(
+		@JsonProperty(FIELD_NAME_PORT) int port) {
+
+		this.port = port;
+	}
+
+	@Override
+	public int hashCode() {
+		return 67 * port;
+	}
+
+	@Override
+	public boolean equals(Object object) {
+		if (object instanceof BlobServerPortResponseBody) {
+			BlobServerPortResponseBody other = (BlobServerPortResponseBody) object;
+			return this.port == other.port;
+		}
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
index fd87316..a59dc83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
@@ -28,8 +28,8 @@ import java.util.Collections;
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-	private final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
-	private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter();
+	public final JobIDPathParameter jobPathParameter = new JobIDPathParameter();
+	public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter();
 
 	@Override
 	public Collection<MessagePathParameter<?>> getPathParameters() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad380463/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
new file mode 100644
index 0000000..6235214
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * These headers define the protocol for submitting a job to a flink cluster.
+ */
+public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
+
+	private static final String URL = "/jobs";
+	private static final JobSubmitHeaders INSTANCE = new JobSubmitHeaders();
+
+	private JobSubmitHeaders() {
+	}
+
+	@Override
+	public Class<JobSubmitRequestBody> getRequestClass() {
+		return JobSubmitRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.POST;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	@Override
+	public Class<JobSubmitResponseBody> getResponseClass() {
+		return JobSubmitResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.ACCEPTED;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static JobSubmitHeaders getInstance() {
+		return INSTANCE;
+	}
+}