You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2019/11/29 13:53:35 UTC
[flink] 07/09: [FLINK-14762][client] Implement
JobClient#getAccumulators
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d8aa9b902eadc74bd3ce3d6f6ef5f0a73bc7331f
Author: tison <wa...@gmail.com>
AuthorDate: Fri Nov 29 10:47:31 2019 +0800
[FLINK-14762][client] Implement JobClient#getAccumulators
---
.../deployment/ClusterClientJobClientAdapter.java | 6 +++++
.../apache/flink/client/program/ClusterClient.java | 5 ++--
.../flink/client/program/MiniClusterClient.java | 18 +++++---------
.../client/program/rest/RestClusterClient.java | 24 ++++++++-----------
.../flink/client/program/TestingClusterClient.java | 3 +--
.../client/program/rest/RestClusterClientTest.java | 19 ++++++---------
.../api/common/accumulators/AccumulatorHelper.java | 28 ++++++++++++++++++++--
.../org/apache/flink/core/execution/JobClient.java | 7 ++++++
.../apache/flink/api/java/TestingJobClient.java | 6 +++++
.../streaming/environment/TestingJobClient.java | 6 +++++
.../test/accumulators/AccumulatorLiveITCase.java | 2 +-
.../utils/SavepointMigrationTestBase.java | 19 ++++++---------
12 files changed, 85 insertions(+), 58 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index f31b9b7..91a9c91 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.function.FunctionUtils;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +75,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
}
@Override
+ public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+ return clusterClient.getAccumulators(jobID, classLoader);
+ }
+
+ @Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
checkNotNull(userClassloader);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8208465..eb19109 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OptionalFailure;
import javax.annotation.Nullable;
@@ -114,7 +113,7 @@ public interface ClusterClient<T> extends AutoCloseable {
* @param jobID The job identifier of a job.
* @return A Map containing the accumulator's name and its value.
*/
- default CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID) {
+ default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) {
return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
}
@@ -125,7 +124,7 @@ public interface ClusterClient<T> extends AutoCloseable {
* @param loader The class loader for deserializing the accumulator results.
* @return A Map containing the accumulator's name and its value.
*/
- CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader);
+ CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
/**
* Cancels a job identified by the job id.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 1f32c0a..7f47552 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -29,8 +30,6 @@ import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +38,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -106,20 +104,16 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl
}
@Override
- public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
+ public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
return miniCluster
.getExecutionGraph(jobID)
.thenApply(AccessExecutionGraph::getAccumulatorsSerialized)
.thenApply(accumulators -> {
- Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulators.size());
- for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulators.entrySet()) {
- try {
- result.put(acc.getKey(), acc.getValue().deserializeValue(loader));
- } catch (Exception e) {
- throw new CompletionException("Cannot deserialize accumulators.", e);
- }
+ try {
+ return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
+ } catch (Exception e) {
+ throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
}
- return result;
});
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index bacbe15..607c320 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -88,7 +88,6 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -405,7 +404,7 @@ public class RestClusterClient<T> implements ClusterClient<T> {
}
@Override
- public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
+ public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
accMsgParams.jobPathParameter.resolve(jobID);
@@ -415,18 +414,15 @@ public class RestClusterClient<T> implements ClusterClient<T> {
accumulatorsHeaders,
accMsgParams);
- return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
- try {
- return AccumulatorHelper.deserializeAccumulators(
- accumulatorsInfo.getSerializedUserAccumulators(),
- loader);
- } catch (Exception e) {
- throw new CompletionException(
- new FlinkException(
- String.format("Deserialization of accumulators for job %s failed.", jobID),
- e));
- }
- });
+ return responseFuture
+ .thenApply(JobAccumulatorsInfo::getSerializedUserAccumulators)
+ .thenApply(accumulators -> {
+ try {
+ return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
+ } catch (Exception e) {
+ throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
+ }
+ });
}
private CompletableFuture<SavepointInfo> pollSavepointAsync(
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index a711c55..f3be219 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.function.TriFunction;
import javax.annotation.Nonnull;
@@ -109,7 +108,7 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
}
@Override
- public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
+ public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
throw new UnsupportedOperationException();
}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 16599c5..d3a381c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -131,6 +131,7 @@ import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -512,21 +513,15 @@ public class RestClusterClientTest extends TestLogger {
TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
- RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
- try {
+ try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
JobID id = new JobID();
+ Map<String, Object> accumulators = restClusterClient.getAccumulators(id).get();
+ assertNotNull(accumulators);
+ assertEquals(1, accumulators.size());
- {
- Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id).get();
- assertNotNull(accumulators);
- assertEquals(1, accumulators.size());
-
- assertEquals(true, accumulators.containsKey("testKey"));
- assertEquals("testValue", accumulators.get("testKey").get().toString());
- }
- } finally {
- restClusterClient.close();
+ assertTrue(accumulators.containsKey("testKey"));
+ assertEquals("testValue", accumulators.get("testKey").toString());
}
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9bc1299..b6d3971 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -178,8 +178,6 @@ public class AccumulatorHelper {
* @param serializedAccumulators The serialized accumulator results.
* @param loader The class loader to use.
* @return The deserialized accumulator results.
- * @throws IOException
- * @throws ClassNotFoundException
*/
public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
@@ -203,4 +201,30 @@ public class AccumulatorHelper {
return accumulators;
}
+
+ /**
+ * Takes the serialized accumulator results and tries to deserialize them using the provided
+ * class loader, and then try to unwrap the value unchecked.
+ * @param serializedAccumulators The serialized accumulator results.
+ * @param loader The class loader to use.
+ * @return The deserialized and unwrapped accumulator results.
+ */
+ public static Map<String, Object> deserializeAndUnwrapAccumulators(
+ Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
+ ClassLoader loader) throws IOException, ClassNotFoundException {
+
+ Map<String, OptionalFailure<Object>> deserializedAccumulators = deserializeAccumulators(serializedAccumulators, loader);
+
+ if (deserializedAccumulators.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size());
+
+ for (Map.Entry<String, OptionalFailure<Object>> entry : deserializedAccumulators.entrySet()) {
+ accumulators.put(entry.getKey(), entry.getValue().getUnchecked());
+ }
+
+ return accumulators;
+ }
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index d514bc4..5bec503 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
import javax.annotation.Nullable;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
@@ -65,6 +66,12 @@ public interface JobClient extends AutoCloseable {
CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory);
/**
+ * Requests the accumulators of the associated job. Accumulators can be requested while it is running
+ * or after it has finished. The class loader is used to deserialize the incoming accumulator results.
+ */
+ CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader);
+
+ /**
* Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
*
* @param userClassloader the classloader used to de-serialize the accumulators of the job.
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index d1725a5..b5c9873 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
@@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient {
return CompletableFuture.completedFuture("null");
}
+ @Override
+ public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
+
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index ede0e19..bcc02ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
@@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient {
return CompletableFuture.completedFuture("null");
}
+ @Override
+ public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
+
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 20195d5..32e41e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -169,7 +169,7 @@ public class AccumulatorLiveITCase extends TestLogger {
deadline,
accumulators -> accumulators.size() == 1
&& accumulators.containsKey(ACCUMULATOR_NAME)
- && (int) accumulators.get(ACCUMULATOR_NAME).getUnchecked() == NUM_ITERATIONS,
+ && (int) accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS,
TestingUtils.defaultScheduledExecutor()
).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 3c2446f..2436cb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
-import org.apache.flink.util.OptionalFailure;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
@@ -58,8 +57,8 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import static junit.framework.Assert.fail;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
/**
* Test savepoint migration.
@@ -146,21 +145,17 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
boolean done = false;
while (deadLine.hasTimeLeft()) {
Thread.sleep(100);
- Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
+ Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
- OptionalFailure<Object> accumOpt = accumulators.get(acc.f0);
+ Object accumOpt = accumulators.get(acc.f0);
if (accumOpt == null) {
allDone = false;
break;
}
- Integer numFinished = (Integer) accumOpt.get();
- if (numFinished == null) {
- allDone = false;
- break;
- }
+ Integer numFinished = (Integer) accumOpt;
if (!numFinished.equals(acc.f1)) {
allDone = false;
break;
@@ -226,16 +221,16 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
}
Thread.sleep(100);
- Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobId).get();
+ Map<String, Object> accumulators = client.getAccumulators(jobId).get();
boolean allDone = true;
for (Tuple2<String, Integer> acc : expectedAccumulators) {
- OptionalFailure<Object> numFinished = accumulators.get(acc.f0);
+ Object numFinished = accumulators.get(acc.f0);
if (numFinished == null) {
allDone = false;
break;
}
- if (!numFinished.get().equals(acc.f1)) {
+ if (!numFinished.equals(acc.f1)) {
allDone = false;
break;
}