You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/10/15 16:43:32 UTC
[1/5] flink git commit: [FLINK-6805] [cassandra] Shade indirect
netty4 dependency
Repository: flink
Updated Branches:
refs/heads/master 47fe61893 -> a76de2860
[FLINK-6805] [cassandra] Shade indirect netty4 dependency
To relocate various *indirect* netty4 dep of ver 4.0.33.Final to
classpath of Flink flavor using Maven shade plugin.
This closes #4545.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81d7c4e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81d7c4e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81d7c4e8
Branch: refs/heads/master
Commit: 81d7c4e8b111c6bf52000f68b64c402783a6ae74
Parents: 89394ec
Author: Michael Fong <mc...@gmail.com>
Authored: Tue Aug 15 14:22:23 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:40:53 2017 +0200
----------------------------------------------------------------------
flink-connectors/flink-connector-cassandra/pom.xml | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/81d7c4e8/flink-connectors/flink-connector-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml
index f006f3a..c97b43f 100644
--- a/flink-connectors/flink-connector-cassandra/pom.xml
+++ b/flink-connectors/flink-connector-cassandra/pom.xml
@@ -72,6 +72,7 @@ under the License.
<include>com.datastax.cassandra:cassandra-driver-core</include>
<include>com.datastax.cassandra:cassandra-driver-mapping</include>
<include>com.google.guava:guava</include>
+ <include>io.netty:netty-*</include>
</includes>
</artifactSet>
<relocations>
@@ -83,6 +84,15 @@ under the License.
<exclude>com.google.inject.**</exclude>
</excludes>
</relocation>
+ <!--
+ For the details of relocation pattern, refer the discussion at
+ https://github.com/apache/flink/pull/4545
+ FLINK-6805
+ -->
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.flink.cassandra.shaded.io.netty</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
[3/5] flink git commit: [FLINK-7774][network] fix not clearing
deserializers on closing an input
Posted by ch...@apache.org.
[FLINK-7774][network] fix not clearing deserializers on closing an input
This closes #4783.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b6f0558
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b6f0558
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b6f0558
Branch: refs/heads/master
Commit: 4b6f05585bb548d1538ada43ed33149acbc9e6d4
Parents: 47fe618
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Sep 4 17:21:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:40:53 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/io/network/api/reader/AbstractRecordReader.java | 1 +
.../org/apache/flink/streaming/runtime/io/StreamInputProcessor.java | 1 +
.../apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java | 1 +
3 files changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4b6f0558/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index c5aeef7..29f2b6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -120,6 +120,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
if (buffer != null && !buffer.isRecycled()) {
buffer.recycle();
}
+ deserializer.clear();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4b6f0558/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 263077d..609f8b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -264,6 +264,7 @@ public class StreamInputProcessor<IN> {
if (buffer != null && !buffer.isRecycled()) {
buffer.recycle();
}
+ deserializer.clear();
}
// cleanup the barrier handler resources
http://git-wip-us.apache.org/repos/asf/flink/blob/4b6f0558/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a25540d..7874147 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -329,6 +329,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
if (buffer != null && !buffer.isRecycled()) {
buffer.recycle();
}
+ deserializer.clear();
}
// cleanup the barrier handler resources
[4/5] flink git commit: [FLINK-7791] [Client] Move LIST logic into
ClusterClient
Posted by ch...@apache.org.
[FLINK-7791] [Client] Move LIST logic into ClusterClient
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5729617
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5729617
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5729617
Branch: refs/heads/master
Commit: e572961749dd45e3f7444664d9ba967fa438ab55
Parents: 81d7c4e
Author: zentol <ch...@apache.org>
Authored: Tue Oct 10 16:52:10 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:43:51 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 119 +++----
.../flink/client/program/ClusterClient.java | 31 ++
.../flink/client/CliFrontendCancelTest.java | 164 ++++++++++
.../flink/client/CliFrontendListCancelTest.java | 310 -------------------
.../flink/client/CliFrontendListTest.java | 72 +++++
.../flink/client/program/ClusterClientTest.java | 43 +++
6 files changed, 361 insertions(+), 378 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index c065453..07c6d65 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -60,7 +60,6 @@ import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -83,6 +82,7 @@ import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
@@ -420,89 +420,72 @@ public class CliFrontend {
}
try {
- ActorGateway jobManagerGateway = getJobManagerGateway(options);
-
- LOG.info("Connecting to JobManager to retrieve list of jobs");
- Future<Object> response = jobManagerGateway.ask(
- JobManagerMessages.getRequestRunningJobsStatus(),
- clientTimeout);
+ CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
+ ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
- Object result;
+ Collection<JobStatusMessage> jobDetails;
try {
- result = Await.result(response, clientTimeout);
- }
- catch (Exception e) {
- throw new Exception("Could not retrieve running jobs from the JobManager.", e);
+ CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = client.listJobs();
+
+ try {
+ logAndSysout("Waiting for response...");
+ jobDetails = jobDetailsFuture.get();
+ }
+ catch (ExecutionException ee) {
+ Throwable cause = ExceptionUtils.stripExecutionException(ee);
+ throw new Exception("Failed to retrieve job list.", cause);
+ }
+ } finally {
+ client.shutdown();
}
- if (result instanceof RunningJobsStatus) {
- LOG.info("Successfully retrieved list of jobs");
+ LOG.info("Successfully retrieved list of jobs");
- List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+ SimpleDateFormat dateFormat = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
+ Comparator<JobStatusMessage> startTimeComparator = (o1, o2) -> (int) (o1.getStartTime() - o2.getStartTime());
- ArrayList<JobStatusMessage> runningJobs = null;
- ArrayList<JobStatusMessage> scheduledJobs = null;
- if (running) {
- runningJobs = new ArrayList<JobStatusMessage>();
- }
- if (scheduled) {
- scheduledJobs = new ArrayList<JobStatusMessage>();
+ final List<JobStatusMessage> runningJobs = new ArrayList<>();
+ final List<JobStatusMessage> scheduledJobs = new ArrayList<>();
+ jobDetails.forEach(details -> {
+ if (details.getJobState() == JobStatus.CREATED) {
+ scheduledJobs.add(details);
+ } else {
+ runningJobs.add(details);
}
+ });
- for (JobStatusMessage rj : jobs) {
- if (running && (rj.getJobState().equals(JobStatus.RUNNING)
- || rj.getJobState().equals(JobStatus.RESTARTING))) {
- runningJobs.add(rj);
- }
- if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) {
- scheduledJobs.add(rj);
- }
+ if (running) {
+ if (runningJobs.size() == 0) {
+ System.out.println("No running jobs.");
}
+ else {
+ runningJobs.sort(startTimeComparator);
- SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
- Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){
- @Override
- public int compare(JobStatusMessage o1, JobStatusMessage o2) {
- return (int) (o1.getStartTime() - o2.getStartTime());
- }
- };
-
- if (running) {
- if (runningJobs.size() == 0) {
- System.out.println("No running jobs.");
- }
- else {
- Collections.sort(runningJobs, njec);
-
- System.out.println("------------------ Running/Restarting Jobs -------------------");
- for (JobStatusMessage rj : runningJobs) {
- System.out.println(df.format(new Date(rj.getStartTime()))
- + " : " + rj.getJobId() + " : " + rj.getJobName() + " (" + rj.getJobState() + ")");
- }
- System.out.println("--------------------------------------------------------------");
+ System.out.println("------------------ Running/Restarting Jobs -------------------");
+ for (JobStatusMessage runningJob : runningJobs) {
+ System.out.println(dateFormat.format(new Date(runningJob.getStartTime()))
+ + " : " + runningJob.getJobId() + " : " + runningJob.getJobName() + " (" + runningJob.getJobState() + ")");
}
+ System.out.println("--------------------------------------------------------------");
}
- if (scheduled) {
- if (scheduledJobs.size() == 0) {
- System.out.println("No scheduled jobs.");
- }
- else {
- Collections.sort(scheduledJobs, njec);
+ }
+ if (scheduled) {
+ if (scheduledJobs.size() == 0) {
+ System.out.println("No scheduled jobs.");
+ }
+ else {
+ scheduledJobs.sort(startTimeComparator);
- System.out.println("----------------------- Scheduled Jobs -----------------------");
- for (JobStatusMessage rj : scheduledJobs) {
- System.out.println(df.format(new Date(rj.getStartTime()))
- + " : " + rj.getJobId() + " : " + rj.getJobName());
- }
- System.out.println("--------------------------------------------------------------");
+ System.out.println("----------------------- Scheduled Jobs -----------------------");
+ for (JobStatusMessage scheduledJob : scheduledJobs) {
+ System.out.println(dateFormat.format(new Date(scheduledJob.getStartTime()))
+ + " : " + scheduledJob.getJobId() + " : " + scheduledJob.getJobName());
}
+ System.out.println("--------------------------------------------------------------");
}
- return 0;
- }
- else {
- throw new Exception("ReqeustRunningJobs requires a response of type " +
- "RunningJobs. Instead the response is of type " + result.getClass() + ".");
}
+
+ return 0;
}
catch (Throwable t) {
return handleError(t);
http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index eb89f09..06fb42a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobListeningContext;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -54,6 +55,8 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.FlinkException;
@@ -70,6 +73,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -684,6 +689,32 @@ public abstract class ClusterClient {
}
/**
+ * Lists the currently running and finished jobs on the cluster.
+ *
+ * @return future collection of running and finished jobs
+ * @throws Exception if no connection to the cluster could be established
+ */
+ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
+ final ActorGateway jobManager = getJobManagerGateway();
+
+ Future<Object> response = jobManager.ask(new RequestJobDetails(true, false), timeout);
+ CompletableFuture<Object> responseFuture = FutureUtils.toJava(response);
+
+ return responseFuture.thenApply((responseMessage) -> {
+ if (responseMessage instanceof MultipleJobsDetails) {
+ MultipleJobsDetails details = (MultipleJobsDetails) responseMessage;
+ Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(details.getRunning().size() + details.getFinished().size());
+ details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+ details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+ return flattenedDetails;
+ } else {
+ throw new CompletionException(
+ new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass()));
+ }
+ });
+ }
+
+ /**
* Requests and returns the accumulators for the given job identifier. Accumulators can be
* requested while a is running or after it has finished. The default class loader is used
* to deserialize the incoming accumulator results.
http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java
new file mode 100644
index 0000000..f2508dc
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendCancelTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.Flip6DefaultCLI;
+import org.apache.flink.client.util.MockedCliFrontend;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+
+/**
+ * Tests for the CANCEL command.
+ */
+public class CliFrontendCancelTest {
+
+ @BeforeClass
+ public static void init() {
+ CliFrontendTestUtils.pipeSystemOutToNull();
+ }
+
+ @Test
+ public void testCancel() {
+ try {
+ // test unrecognized option
+ {
+ String[] parameters = {"-v", "-l"};
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+ int retCode = testFrontend.cancel(parameters);
+ assertTrue(retCode != 0);
+ }
+
+ // test missing job id
+ {
+ String[] parameters = {};
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+ int retCode = testFrontend.cancel(parameters);
+ assertTrue(retCode != 0);
+ }
+
+ // test cancel properly
+ {
+ JobID jid = new JobID();
+
+ String[] parameters = { jid.toString() };
+ CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
+
+ int retCode = testFrontend.cancel(parameters);
+ assertTrue(retCode == 0);
+
+ Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
+ }
+
+ // test cancel properly
+ {
+ JobID jid = new JobID();
+
+ String[] parameters = { jid.toString() };
+ CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true);
+
+ int retCode = testFrontend.cancel(parameters);
+ assertTrue(retCode != 0);
+
+ Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
+ }
+
+ // test flip6 switch
+ {
+ String[] parameters =
+ {"-flip6", String.valueOf(new JobID())};
+ CancelOptions options = CliFrontendParser.parseCancelCommand(parameters);
+ assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Program caused an exception: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Tests cancelling with the savepoint option.
+ */
+ @Test
+ public void testCancelWithSavepoint() throws Exception {
+ {
+ // Cancel with savepoint (no target directory)
+ JobID jid = new JobID();
+
+ String[] parameters = { "-s", jid.toString() };
+ CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
+ assertEquals(0, testFrontend.cancel(parameters));
+
+ Mockito.verify(testFrontend.client, times(1))
+ .cancelWithSavepoint(any(JobID.class), isNull(String.class));
+ }
+
+ {
+ // Cancel with savepoint (with target directory)
+ JobID jid = new JobID();
+
+ String[] parameters = { "-s", "targetDirectory", jid.toString() };
+ CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
+ assertEquals(0, testFrontend.cancel(parameters));
+
+ Mockito.verify(testFrontend.client, times(1))
+ .cancelWithSavepoint(any(JobID.class), notNull(String.class));
+ }
+
+ {
+ // Cancel with savepoint (with target directory), but no job ID
+ String[] parameters = { "-s", "targetDirectory" };
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+ assertNotEquals(0, testFrontend.cancel(parameters));
+ }
+
+ {
+ // Cancel with savepoint (no target directory) and no job ID
+ String[] parameters = { "-s" };
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+ assertNotEquals(0, testFrontend.cancel(parameters));
+ }
+ }
+
+ private static final class CancelTestCliFrontend extends MockedCliFrontend {
+
+ CancelTestCliFrontend(boolean reject) throws Exception {
+ if (reject) {
+ doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
+ doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
deleted file mode 100644
index b01d162..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.CancelOptions;
-import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CommandLineOptions;
-import org.apache.flink.client.cli.Flip6DefaultCLI;
-import org.apache.flink.client.util.MockedCliFrontend;
-import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.UUID;
-
-import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Matchers.notNull;
-import static org.mockito.Mockito.times;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-
-/**
- * Tests for the CANCEL and LIST commands.
- */
-public class CliFrontendListCancelTest {
-
- private static ActorSystem actorSystem;
-
- @BeforeClass
- public static void setup(){
- pipeSystemOutToNull();
- actorSystem = ActorSystem.create("TestingActorSystem");
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(actorSystem);
- actorSystem = null;
- }
-
- @BeforeClass
- public static void init() {
- CliFrontendTestUtils.pipeSystemOutToNull();
- }
-
- @Test
- public void testCancel() {
- try {
- // test unrecognized option
- {
- String[] parameters = {"-v", "-l"};
- CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- int retCode = testFrontend.cancel(parameters);
- assertTrue(retCode != 0);
- }
-
- // test missing job id
- {
- String[] parameters = {};
- CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- int retCode = testFrontend.cancel(parameters);
- assertTrue(retCode != 0);
- }
-
- // test cancel properly
- {
- JobID jid = new JobID();
-
- String[] parameters = { jid.toString() };
- CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
-
- int retCode = testFrontend.cancel(parameters);
- assertTrue(retCode == 0);
-
- Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
- }
-
- // test cancel properly
- {
- JobID jid = new JobID();
-
- String[] parameters = { jid.toString() };
- CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(true);
-
- int retCode = testFrontend.cancel(parameters);
- assertTrue(retCode != 0);
-
- Mockito.verify(testFrontend.client, times(1)).cancel(any(JobID.class));
- }
-
- // test flip6 switch
- {
- String[] parameters =
- {"-flip6", String.valueOf(new JobID())};
- CancelOptions options = CliFrontendParser.parseCancelCommand(parameters);
- assertTrue(options.getCommandLine().hasOption(Flip6DefaultCLI.FLIP_6.getOpt()));
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Program caused an exception: " + e.getMessage());
- }
- }
-
- /**
- * Tests cancelling with the savepoint option.
- */
- @Test
- public void testCancelWithSavepoint() throws Exception {
- {
- // Cancel with savepoint (no target directory)
- JobID jid = new JobID();
-
- String[] parameters = { "-s", jid.toString() };
- CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
- assertEquals(0, testFrontend.cancel(parameters));
-
- Mockito.verify(testFrontend.client, times(1))
- .cancelWithSavepoint(any(JobID.class), isNull(String.class));
- }
-
- {
- // Cancel with savepoint (with target directory)
- JobID jid = new JobID();
-
- String[] parameters = { "-s", "targetDirectory", jid.toString() };
- CancelTestCliFrontend testFrontend = new CancelTestCliFrontend(false);
- assertEquals(0, testFrontend.cancel(parameters));
-
- Mockito.verify(testFrontend.client, times(1))
- .cancelWithSavepoint(any(JobID.class), notNull(String.class));
- }
-
- {
- // Cancel with savepoint (with target directory), but no job ID
- String[] parameters = { "-s", "targetDirectory" };
- CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- assertNotEquals(0, testFrontend.cancel(parameters));
- }
-
- {
- // Cancel with savepoint (no target directory) and no job ID
- String[] parameters = { "-s" };
- CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- assertNotEquals(0, testFrontend.cancel(parameters));
- }
- }
-
- @Test
- public void testList() {
- try {
- // test unrecognized option
- {
- String[] parameters = {"-v", "-k"};
- CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
- int retCode = testFrontend.list(parameters);
- assertTrue(retCode != 0);
- }
-
- // test list properly
- {
- final UUID leaderSessionID = UUID.randomUUID();
- final ActorRef jm = actorSystem.actorOf(
- Props.create(
- CliJobManager.class,
- null,
- leaderSessionID
- )
- );
- final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
- String[] parameters = {"-r", "-s"};
- InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
- int retCode = testFrontend.list(parameters);
- assertTrue(retCode == 0);
- }
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Program caused an exception: " + e.getMessage());
- }
- }
-
- private static final class CancelTestCliFrontend extends MockedCliFrontend {
-
- CancelTestCliFrontend(boolean reject) throws Exception {
- if (reject) {
- doThrow(new IllegalArgumentException("Test exception")).when(client).cancel(any(JobID.class));
- doThrow(new IllegalArgumentException("Test exception")).when(client).cancelWithSavepoint(any(JobID.class), anyString());
- }
- }
- }
-
- private static final class InfoListTestCliFrontend extends CliFrontend {
-
- private ActorGateway jobManagerGateway;
-
- InfoListTestCliFrontend(ActorGateway jobManagerGateway) throws Exception {
- super(CliFrontendTestUtils.getConfigDir());
- this.jobManagerGateway = jobManagerGateway;
- }
-
- @Override
- public ActorGateway getJobManagerGateway(CommandLineOptions options) {
- return jobManagerGateway;
- }
- }
-
- private static final class CliJobManager extends FlinkUntypedActor {
- private final JobID jobID;
- private final UUID leaderSessionID;
- private final String targetDirectory;
-
- public CliJobManager(final JobID jobID, final UUID leaderSessionID){
- this(jobID, leaderSessionID, null);
- }
-
- public CliJobManager(final JobID jobID, final UUID leaderSessionID, String targetDirectory){
- this.jobID = jobID;
- this.leaderSessionID = leaderSessionID;
- this.targetDirectory = targetDirectory;
- }
-
- @Override
- public void handleMessage(Object message) {
- if (message instanceof JobManagerMessages.RequestTotalNumberOfSlots$) {
- getSender().tell(decorateMessage(1), getSelf());
- }
- else if (message instanceof JobManagerMessages.CancelJob) {
- JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
-
- if (jobID != null && jobID.equals(cancelJob.jobID())) {
- getSender().tell(
- decorateMessage(new Status.Success(new JobManagerMessages.CancellationSuccess(jobID, null))),
- getSelf());
- }
- else {
- getSender().tell(
- decorateMessage(new Status.Failure(new Exception("Wrong or no JobID"))),
- getSelf());
- }
- }
- else if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
- JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
-
- if (jobID != null && jobID.equals(cancelJob.jobID())) {
- if (targetDirectory == null && cancelJob.savepointDirectory() == null ||
- targetDirectory != null && targetDirectory.equals(cancelJob.savepointDirectory())) {
- getSender().tell(
- decorateMessage(new JobManagerMessages.CancellationSuccess(jobID, targetDirectory)),
- getSelf());
- } else {
- getSender().tell(
- decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong target directory"))),
- getSelf());
- }
- }
- else {
- getSender().tell(
- decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong or no JobID"))),
- getSelf());
- }
- }
- else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
- getSender().tell(
- decorateMessage(new JobManagerMessages.RunningJobsStatus()),
- getSelf());
- }
- }
-
- @Override
- protected UUID getLeaderSessionID() {
- return leaderSessionID;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java
new file mode 100644
index 0000000..b559af1
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.client.util.MockedCliFrontend;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the LIST command.
+ */
+public class CliFrontendListTest extends TestLogger {
+
+ @BeforeClass
+ public static void init() {
+ CliFrontendTestUtils.pipeSystemOutToNull();
+ }
+
+ @Test
+ public void testList() throws Exception {
+ // test unrecognized option
+ {
+ String[] parameters = {"-v", "-k"};
+ CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+ int retCode = testFrontend.list(parameters);
+ assertTrue(retCode != 0);
+ }
+
+ // test list properly
+ {
+ String[] parameters = {"-r", "-s"};
+ ListTestCliFrontend testFrontend = new ListTestCliFrontend();
+ int retCode = testFrontend.list(parameters);
+ assertTrue(retCode == 0);
+ Mockito.verify(testFrontend.client, times(1))
+ .listJobs();
+ }
+ }
+
+ private static final class ListTestCliFrontend extends MockedCliFrontend {
+
+ ListTestCliFrontend() throws Exception {
+ when(client.listJobs()).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5729617/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
index 5f6d9fe..ec73d76 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClusterClientTest.java
@@ -21,15 +21,23 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import scala.concurrent.Future;
@@ -134,6 +142,27 @@ public class ClusterClientTest extends TestLogger {
}
}
+ @Test
+ public void testClusterClientList() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+ TestListActorGateway gateway = new TestListActorGateway();
+ ClusterClient clusterClient = new TestClusterClient(config, gateway);
+ try {
+ CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = clusterClient.listJobs();
+ Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
+ Assert.assertTrue(gateway.messageArrived);
+ Assert.assertEquals(2, jobDetails.size());
+ Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
+ JobStatusMessage job1 = jobDetailsIterator.next();
+ JobStatusMessage job2 = jobDetailsIterator.next();
+ Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+ } finally {
+ clusterClient.shutdown();
+ }
+ }
+
private static class TestStopActorGateway extends DummyActorGateway {
private final JobID expectedJobID;
@@ -218,6 +247,20 @@ public class ClusterClientTest extends TestLogger {
}
}
+ private static class TestListActorGateway extends TestActorGateway<RequestJobDetails, MultipleJobsDetails> {
+
+ TestListActorGateway() {
+ super(RequestJobDetails.class);
+ }
+
+ @Override
+ public MultipleJobsDetails process(RequestJobDetails message) {
+ JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
+ JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
+ return new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished));
+ }
+ }
+
private static class TestClusterClient extends StandaloneClusterClient {
private final ActorGateway jobmanagerGateway;
[5/5] flink git commit: [FLINK-7791] [REST][client] Integrate LIST
command into RestClusterClient
Posted by ch...@apache.org.
[FLINK-7791] [REST][client] Integrate LIST command into RestClusterClient
This closes #4802.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a76de286
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a76de286
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a76de286
Branch: refs/heads/master
Commit: a76de2860d2ee78e673c97fba3394f099389ad75
Parents: e572961
Author: zentol <ch...@apache.org>
Authored: Wed Oct 11 13:58:59 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:43:58 2017 +0200
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 22 +++++++
.../program/rest/RestClusterClientTest.java | 61 ++++++++++++++++++++
2 files changed, 83 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8ae18af..3916514 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -27,12 +27,15 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -49,6 +52,8 @@ import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -192,6 +197,23 @@ public class RestClusterClient extends ClusterClient {
.thenApply(response -> response.location);
}
+ @Override
+ public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception {
+ CurrentJobsOverviewHandlerHeaders headers = CurrentJobsOverviewHandlerHeaders.getInstance();
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest(
+ restClusterClientConfiguration.getRestServerAddress(),
+ restClusterClientConfiguration.getRestServerPort(),
+ headers
+ );
+ return jobDetailsFuture
+ .thenApply(details -> {
+ Collection<JobStatusMessage> flattenedDetails = new ArrayList<>();
+ details.getRunning().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+ details.getFinished().forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime())));
+ return flattenedDetails;
+ });
+ }
+
// ======================================
// Legacy stuff we actually implement
// ======================================
http://git-wip-us.apache.org/repos/asf/flink/blob/a76de286/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 00c37a6..cd6fc0c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -34,6 +38,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -64,6 +69,8 @@ import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -74,6 +81,9 @@ import static org.mockito.Mockito.when;
/**
* Tests for the {@link RestClusterClient}.
+ *
+ * <p>These tests verify that the client uses the appropriate headers for each
+ * request, properly constructs the request bodies/parameters and processes the responses correctly.
*/
public class RestClusterClientTest extends TestLogger {
@@ -257,6 +267,57 @@ public class RestClusterClientTest extends TestLogger {
}
}
+ @Test
+ public void testListJobs() throws Exception {
+
+ Configuration config = new Configuration();
+ config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+ RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config);
+
+ TestListJobsHandler listJobsHandler = new TestListJobsHandler();
+
+ RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+ @Override
+ protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
+
+ Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
+ handlers.add(Tuple2.of(listJobsHandler.getMessageHeaders(), listJobsHandler));
+ return handlers;
+ }
+ };
+
+ RestClusterClient rcc = new RestClusterClient(config);
+ try {
+ rse.start();
+
+ {
+ CompletableFuture<Collection<JobStatusMessage>> jobDetailsFuture = rcc.listJobs();
+ Collection<JobStatusMessage> jobDetails = jobDetailsFuture.get();
+ Iterator<JobStatusMessage> jobDetailsIterator = jobDetails.iterator();
+ JobStatusMessage job1 = jobDetailsIterator.next();
+ JobStatusMessage job2 = jobDetailsIterator.next();
+ Assert.assertNotEquals("The job statues should not be equal.", job1.getJobState(), job2.getJobState());
+ }
+ } finally {
+ rcc.shutdown();
+ rse.shutdown(Time.seconds(5));
+ }}
+
+ private static class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+
+ private TestListJobsHandler() {
+ super(CurrentJobsOverviewHandlerHeaders.getInstance());
+ }
+
+ @Override
+ protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
+ JobDetails running = new JobDetails(new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
+ JobDetails finished = new JobDetails(new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
+ return CompletableFuture.completedFuture(new MultipleJobsDetails(Collections.singleton(running), Collections.singleton(finished)));
+ }
+ }
+
private abstract static class TestHandler<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends AbstractRestHandler<DispatcherGateway, R, P, M> {
private TestHandler(MessageHeaders<R, P, M> headers) {
[2/5] flink git commit: [FLINK-6703][docs] Document how to take a
savepoint on YARN
Posted by ch...@apache.org.
[FLINK-6703][docs] Document how to take a savepoint on YARN
This closes #4721.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89394ec8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89394ec8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89394ec8
Branch: refs/heads/master
Commit: 89394ec8094205cb3fd3a47a95dacd1b18c31088
Parents: 4b6f055
Author: Bowen Li <bo...@gmail.com>
Authored: Sun Sep 24 21:56:58 2017 -0700
Committer: zentol <ch...@apache.org>
Committed: Sun Oct 15 16:40:53 2017 +0200
----------------------------------------------------------------------
docs/ops/cli.md | 23 ++++++++++++++++++-----
docs/ops/state/savepoints.md | 18 +++++++++++++++---
2 files changed, 33 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/89394ec8/docs/ops/cli.md
----------------------------------------------------------------------
diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 11c8caf..4f80b08 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -138,17 +138,30 @@ This allows the job to finish processing all inflight data.
[Savepoints]({{site.baseurl}}/ops/state/savepoints.html) are controlled via the command line client:
-#### Trigger a savepoint
+#### Trigger a Savepoint
{% highlight bash %}
-./bin/flink savepoint <jobID> [savepointDirectory]
+./bin/flink savepoint <jobId> [savepointDirectory]
{% endhighlight %}
-Returns the path of the created savepoint. You need this path to restore and dispose savepoints.
+This will trigger a savepoint for the job with ID `jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints.
-You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)).
-##### Cancel with a savepoint
+Furthermore, you can optionally specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager.
+
+If you don't specify a target directory, you need to have [configured a default directory](#configuration) (see [Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)). Otherwise, triggering the savepoint will fail.
+
+#### Trigger a Savepoint with YARN
+
+{% highlight bash %}
+./bin/flink savepoint <jobId> [savepointDirectory] -yid <yarnAppId>
+{% endhighlight %}
+
+This will trigger a savepoint for the job with ID `jobId` and YARN application ID `yarnAppId`, and returns the path of the created savepoint.
+
+Everything else is the same as described in the above **Trigger a Savepoint** section.
+
+#### Cancel with a savepoint
You can atomically trigger a savepoint and cancel a job.
http://git-wip-us.apache.org/repos/asf/flink/blob/89394ec8/docs/ops/state/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index d6d4c53..057469a 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -103,12 +103,24 @@ Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state wi
#### Trigger a Savepoint
```sh
-$ bin/flink savepoint :jobId [:targetDirectory]
+$ bin/flink savepoint :jobId [:savepointDirectory]
```
-This will trigger a savepoint for the job with ID `:jobid`. Furthermore, you can specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager.
+This will trigger a savepoint for the job with ID `:jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints.
-If you don't specify a target directory, you need to have [configured a default directory](#configuration). Otherwise, triggering the savepoint will fail.
+Furthermore, you can optionally specify a target file system directory to store the savepoint in. The directory needs to be accessible by the JobManager.
+
+If you don't specify a target directory, you need to have [configured a default directory](#configuration) (see [Savepoints]({{site.baseurl}}/ops/state/savepoints.html#configuration)). Otherwise, triggering the savepoint will fail.
+
+#### Trigger a Savepoint with YARN
+
+```sh
+$ bin/flink savepoint :jobId [:savepointDirectory] -yid :yarnAppId
+```
+
+This will trigger a savepoint for the job with ID `:jobId` and YARN application ID `:yarnAppId`, and returns the path of the created savepoint.
+
+Everything else is the same as described in the above **Trigger a Savepoint** section.
#### Cancel Job with Savepoint