You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/03/14 18:10:34 UTC

[GitHub] flink pull request #5701: 8703 c savepoint

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5701

    8703 c savepoint

    Based on #5690 and #5699.
    
    ## What is the purpose of the change
    
    With this PR accumulator updates are sent via heartbeats from the TaskManager to JobManagers.
    The SavepointMigrationTestBase was also ported to flip6, serving as preliminary verification until the other accumulator tests are ported.
    
    ## Verifying this change
    
    *(Please pick either of the following options)*
    
    This change added tests and can be verified as follows:
    
    * run SavepointMigrationTestBase
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 8703_c_savepoint

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5701.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5701
    
----
commit 14bcb66d858ee3e9488a51df7bfa1e36ec97f463
Author: zentol <ch...@...>
Date:   2018-03-14T13:21:27Z

    [FLINK-8942][runtime] Pass heartbeat target ResourceID

commit 761fd90f07335883429fc80fb4032b9ef28d32f5
Author: zentol <ch...@...>
Date:   2018-03-14T17:52:16Z

    [FLINK-8881][runtime] Send accumulator updates via heartbeats

commit 280874939c4d77e893da80e1e40acdcc869280bb
Author: zentol <ch...@...>
Date:   2018-03-07T10:05:42Z

    [FLINK-8935][tests] Implement MiniClusterClient#getAccumulators

commit d2fa754d9c5b68672ddc16233cdf390dabfd17c0
Author: zentol <ch...@...>
Date:   2018-03-06T12:26:59Z

    [FLINK-8935][tests] Implement MiniClusterClient#triggerSavepoint

commit ea04fd437d305a76c74edc6aa6c473a1fa917895
Author: zentol <ch...@...>
Date:   2018-02-26T13:54:07Z

    [FLINK-8703][tests] Port SavepointMigrationTestBase to MiniClusterResource

----


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175755400
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
     			String savepointPath,
     			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
     
    -		// Retrieve the job manager
    -		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
    +		ClusterClient<?> client = miniClusterResource.getClusterClient();
    +		client.setDetached(true);
     
     		// Submit the job
     		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
     		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
     
    -		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
    -
    -		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
    -		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
    +		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
     
     		boolean done = false;
     		while (DEADLINE.hasTimeLeft()) {
     
     			// try and get a job result, this will fail if the job already failed. Use this
     			// to get out of this loop
     			JobID jobId = jobSubmissionResult.getJobID();
    -			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
     
     			try {
    +				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
     
    -				Future<Object> future = clusterClient
    -						.getJobManagerGateway()
    -						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
    -
    -				Object result = Await.result(future, timeout);
    +				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
     
    -				if (result instanceof JobManagerMessages.CurrentJobStatus) {
    -					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
    -						Object jobResult = Await.result(
    -								jobListeningContext.getJobResultFuture(),
    -								Duration.apply(5, TimeUnit.SECONDS));
    -						fail("Job failed: " + jobResult);
    -					}
    -				}
    +				assertNotEquals(JobStatus.FAILED, jobStatus);
     			} catch (Exception e) {
     				fail("Could not connect to job: " + e);
     			}
     
     			Thread.sleep(100);
    --- End diff --
    
    True, you're right. Sorry I didn't look closely enough at the test case. Forget my comments here :-)


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5701


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175729168
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---
    @@ -1515,8 +1516,22 @@ public void reportPayload(ResourceID resourceID, Void payload) {
     		}
     
     		@Override
    -		public CompletableFuture<Void> retrievePayload(ResourceID resourceID) {
    -			return CompletableFuture.completedFuture(null);
    +		public CompletableFuture<AccumulatorReport> retrievePayload(ResourceID resourceID) {
    --- End diff --
    
    Let's add `validateRunsInMainThread` as a first statement. That way we enforce that this method really runs in the main thread context.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175733627
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
    +
    +import java.io.Serializable;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * A report about the current values of all accumulators of the TaskExecutor for a given job.
    + */
    +public class AccumulatorReport implements Serializable, Iterable<AccumulatorSnapshot> {
    +	private final List<AccumulatorSnapshot> accumulatorSnapshots;
    --- End diff --
    
    sure


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175747586
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
    +
    +import java.io.Serializable;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * A report about the current values of all accumulators of the TaskExecutor for a given job.
    + */
    +public class AccumulatorReport implements Serializable, Iterable<AccumulatorSnapshot> {
    --- End diff --
    
    I think `SlotReport` is wrongly implementing this interface.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175733781
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
    +
    +import java.io.Serializable;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * A report about the current values of all accumulators of the TaskExecutor for a given job.
    + */
    +public class AccumulatorReport implements Serializable, Iterable<AccumulatorSnapshot> {
    --- End diff --
    
    I can change it. I followed the design of `SlotReport` which also implements iterable even though it just holds a collection.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175731028
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -87,17 +89,24 @@ protected static String getResourceFilename(String filename) {
     		return resource.getFile();
     	}
     
    -	@Before
    -	public void setup() throws Exception {
    +	private Configuration getConfigurationSafe() {
    --- End diff --
    
    I think we can get rid of this method by moving the `miniClusterResource` initialization to the constructor which says that it can throw an `Exception`.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175731826
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
     			String savepointPath,
     			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
     
    -		// Retrieve the job manager
    -		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
    +		ClusterClient<?> client = miniClusterResource.getClusterClient();
    +		client.setDetached(true);
    --- End diff --
    
    As a side note which is out of scope for this issue. I think we should `deprecate` `ClusterClient#setDetached`. It should not be an attribute of the client but more of how you submit a job.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175747950
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
     			String savepointPath,
     			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
     
    -		// Retrieve the job manager
    -		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
    +		ClusterClient<?> client = miniClusterResource.getClusterClient();
    +		client.setDetached(true);
     
     		// Submit the job
     		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
     		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
     
    -		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
    -
    -		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
    -		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
    +		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
     
     		boolean done = false;
     		while (DEADLINE.hasTimeLeft()) {
     
     			// try and get a job result, this will fail if the job already failed. Use this
     			// to get out of this loop
     			JobID jobId = jobSubmissionResult.getJobID();
    -			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
     
     			try {
    +				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
     
    -				Future<Object> future = clusterClient
    -						.getJobManagerGateway()
    -						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
    -
    -				Object result = Await.result(future, timeout);
    +				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
     
    -				if (result instanceof JobManagerMessages.CurrentJobStatus) {
    -					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
    -						Object jobResult = Await.result(
    -								jobListeningContext.getJobResultFuture(),
    -								Duration.apply(5, TimeUnit.SECONDS));
    -						fail("Job failed: " + jobResult);
    -					}
    -				}
    +				assertNotEquals(JobStatus.FAILED, jobStatus);
     			} catch (Exception e) {
     				fail("Could not connect to job: " + e);
     			}
     
     			Thread.sleep(100);
    --- End diff --
    
    How so? Don't we call it once after the job has reached the `JobStatus.FAILED` test? Maybe it is actually not needed and can be removed.


---

[GitHub] flink issue #5701: [FLINK-8703][tests] Port SavepointMigrationTestBase to Mi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5701
  
    @tillrohrmann done. I wouldn't squash the savepoint port commit with the accumulator changes though.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175727728
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
    +
    +import java.io.Serializable;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * A report about the current values of all accumulators of the TaskExecutor for a given job.
    + */
    +public class AccumulatorReport implements Serializable, Iterable<AccumulatorSnapshot> {
    +	private final List<AccumulatorSnapshot> accumulatorSnapshots;
    --- End diff --
    
    This could also be a `Collection` right?


---

[GitHub] flink issue #5701: [FLINK-8703][tests] Port SavepointMigrationTestBase to Mi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5701
  
    What about the other commit @zentol? Do you want me to commit them separately? I guess it would be easiest to commit everything together.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175731427
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
     			String savepointPath,
     			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
     
    -		// Retrieve the job manager
    -		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
    +		ClusterClient<?> client = miniClusterResource.getClusterClient();
    +		client.setDetached(true);
     
     		// Submit the job
     		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
     		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
     
    -		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
    -
    -		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
    -		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
    +		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
     
     		boolean done = false;
     		while (DEADLINE.hasTimeLeft()) {
     
     			// try and get a job result, this will fail if the job already failed. Use this
     			// to get out of this loop
     			JobID jobId = jobSubmissionResult.getJobID();
    -			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
     
     			try {
    +				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
     
    -				Future<Object> future = clusterClient
    -						.getJobManagerGateway()
    -						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
    -
    -				Object result = Await.result(future, timeout);
    +				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
     
    -				if (result instanceof JobManagerMessages.CurrentJobStatus) {
    -					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
    -						Object jobResult = Await.result(
    -								jobListeningContext.getJobResultFuture(),
    -								Duration.apply(5, TimeUnit.SECONDS));
    -						fail("Job failed: " + jobResult);
    -					}
    -				}
    +				assertNotEquals(JobStatus.FAILED, jobStatus);
     			} catch (Exception e) {
     				fail("Could not connect to job: " + e);
     			}
     
     			Thread.sleep(100);
    --- End diff --
    
    For what do we need this sleep?


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175735031
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
     			String savepointPath,
     			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
     
    -		// Retrieve the job manager
    -		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
    +		ClusterClient<?> client = miniClusterResource.getClusterClient();
    +		client.setDetached(true);
     
     		// Submit the job
     		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
     		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
     
    -		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
    -
    -		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
    -		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
    +		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
     
     		boolean done = false;
     		while (DEADLINE.hasTimeLeft()) {
     
     			// try and get a job result, this will fail if the job already failed. Use this
     			// to get out of this loop
     			JobID jobId = jobSubmissionResult.getJobID();
    -			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
     
     			try {
    +				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
     
    -				Future<Object> future = clusterClient
    -						.getJobManagerGateway()
    -						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
    -
    -				Object result = Await.result(future, timeout);
    +				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
     
    -				if (result instanceof JobManagerMessages.CurrentJobStatus) {
    -					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
    -						Object jobResult = Await.result(
    -								jobListeningContext.getJobResultFuture(),
    -								Duration.apply(5, TimeUnit.SECONDS));
    -						fail("Job failed: " + jobResult);
    -					}
    -				}
    +				assertNotEquals(JobStatus.FAILED, jobStatus);
     			} catch (Exception e) {
     				fail("Could not connect to job: " + e);
     			}
     
     			Thread.sleep(100);
    --- End diff --
    
    probably meant to prevent the test from spamming the cluster with accumulator requests.


---

[GitHub] flink issue #5701: [FLINK-8703][tests] Port SavepointMigrationTestBase to Mi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5701
  
    let me quickly address the `getCOnfigurationSafe` comment and then let's merge them together.


---

[GitHub] flink issue #5701: [FLINK-8703][tests] Port SavepointMigrationTestBase to Mi...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/5701
  
    Sure @zentol. Won't squash them. Thanks for your work. Merging once Travis gave green light.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175730158
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
    +
    +import java.io.Serializable;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +/**
    + * A report about the current values of all accumulators of the TaskExecutor for a given job.
    + */
    +public class AccumulatorReport implements Serializable, Iterable<AccumulatorSnapshot> {
    --- End diff --
    
    I think it would be better to not implement `Iterable`. `AccumulatorReport` is a value class which contains a `Collection` of `AccumulatorSnapshot`. Thus, it should be as simple as this. By letting this class implement the `Iterable` interface it basically means that `AccumulatorReport` can be used in places where an `Iterable` is usable. I think this should not be the case for this class. My concern is that we only let it implement this interface to cut a corner when iterating over the list of `AccumulatorSnapshots`.


---

[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5701#discussion_r175751892
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java ---
    @@ -194,49 +187,35 @@ protected final void restoreAndExecute(
     			String savepointPath,
     			Tuple2<String, Integer>... expectedAccumulators) throws Exception {
     
    -		// Retrieve the job manager
    -		Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
    +		ClusterClient<?> client = miniClusterResource.getClusterClient();
    +		client.setDetached(true);
     
     		// Submit the job
     		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
     		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
     
    -		JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph);
    -
    -		StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration());
    -		JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID());
    +		JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
     
     		boolean done = false;
     		while (DEADLINE.hasTimeLeft()) {
     
     			// try and get a job result, this will fail if the job already failed. Use this
     			// to get out of this loop
     			JobID jobId = jobSubmissionResult.getJobID();
    -			FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS);
     
     			try {
    +				CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
     
    -				Future<Object> future = clusterClient
    -						.getJobManagerGateway()
    -						.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout);
    -
    -				Object result = Await.result(future, timeout);
    +				JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS);
     
    -				if (result instanceof JobManagerMessages.CurrentJobStatus) {
    -					if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
    -						Object jobResult = Await.result(
    -								jobListeningContext.getJobResultFuture(),
    -								Duration.apply(5, TimeUnit.SECONDS));
    -						fail("Job failed: " + jobResult);
    -					}
    -				}
    +				assertNotEquals(JobStatus.FAILED, jobStatus);
     			} catch (Exception e) {
     				fail("Could not connect to job: " + e);
     			}
     
     			Thread.sleep(100);
    --- End diff --
    
    If the job reaches `JobStatus.FAILED` the test fails. `assertNotEquals(JobStatus.FAILED, jobStatus);`
    
    We're polling the accumulators in a loop so long as the job is not failed and the deadline wasn't reached yet.


---

[GitHub] flink issue #5701: [FLINK-8703][tests] Port SavepointMigrationTestBase to Mi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5701
  
    @tillrohrmann I've addressed your comments regarding the accumulator commit.


---