You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2019/11/29 13:53:35 UTC

[flink] 07/09: [FLINK-14762][client] Implement JobClient#getAccumulators

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8aa9b902eadc74bd3ce3d6f6ef5f0a73bc7331f
Author: tison <wa...@gmail.com>
AuthorDate: Fri Nov 29 10:47:31 2019 +0800

    [FLINK-14762][client] Implement JobClient#getAccumulators
---
 .../deployment/ClusterClientJobClientAdapter.java  |  6 +++++
 .../apache/flink/client/program/ClusterClient.java |  5 ++--
 .../flink/client/program/MiniClusterClient.java    | 18 +++++---------
 .../client/program/rest/RestClusterClient.java     | 24 ++++++++-----------
 .../flink/client/program/TestingClusterClient.java |  3 +--
 .../client/program/rest/RestClusterClientTest.java | 19 ++++++---------
 .../api/common/accumulators/AccumulatorHelper.java | 28 ++++++++++++++++++++--
 .../org/apache/flink/core/execution/JobClient.java |  7 ++++++
 .../apache/flink/api/java/TestingJobClient.java    |  6 +++++
 .../streaming/environment/TestingJobClient.java    |  6 +++++
 .../test/accumulators/AccumulatorLiveITCase.java   |  2 +-
 .../utils/SavepointMigrationTestBase.java          | 19 ++++++---------
 12 files changed, 85 insertions(+), 58 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index f31b9b7..91a9c91 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.function.FunctionUtils;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +75,11 @@ public class ClusterClientJobClientAdapter<ClusterID> implements JobClient {
 	}
 
 	@Override
+	public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+		return clusterClient.getAccumulators(jobID, classLoader);
+	}
+
+	@Override
 	public CompletableFuture<JobExecutionResult> getJobExecutionResult(final ClassLoader userClassloader) {
 		checkNotNull(userClassloader);
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 8208465..eb19109 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OptionalFailure;
 
 import javax.annotation.Nullable;
 
@@ -114,7 +113,7 @@ public interface ClusterClient<T> extends AutoCloseable {
 	 * @param jobID The job identifier of a job.
 	 * @return A Map containing the accumulator's name and its value.
 	 */
-	default CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID) {
+	default CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID) {
 		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
 	}
 
@@ -125,7 +124,7 @@ public interface ClusterClient<T> extends AutoCloseable {
 	 * @param loader The class loader for deserializing the accumulator results.
 	 * @return A Map containing the accumulator's name and its value.
 	 */
-	CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader);
+	CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);
 
 	/**
 	 * Cancels a job identified by the job id.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 1f32c0a..7f47552 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -29,8 +30,6 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.OptionalFailure;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +38,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -106,20 +104,16 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl
 	}
 
 	@Override
-	public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
+	public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
 		return miniCluster
 			.getExecutionGraph(jobID)
 			.thenApply(AccessExecutionGraph::getAccumulatorsSerialized)
 			.thenApply(accumulators -> {
-				Map<String, OptionalFailure<Object>> result = new HashMap<>(accumulators.size());
-				for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> acc : accumulators.entrySet()) {
-					try {
-						result.put(acc.getKey(), acc.getValue().deserializeValue(loader));
-					} catch (Exception e) {
-						throw new CompletionException("Cannot deserialize accumulators.", e);
-					}
+				try {
+					return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
+				} catch (Exception e) {
+					throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
 				}
-				return result;
 			});
 	}
 
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index bacbe15..607c320 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -88,7 +88,6 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
@@ -405,7 +404,7 @@ public class RestClusterClient<T> implements ClusterClient<T> {
 	}
 
 	@Override
-	public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
+	public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
 		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
 		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
 		accMsgParams.jobPathParameter.resolve(jobID);
@@ -415,18 +414,15 @@ public class RestClusterClient<T> implements ClusterClient<T> {
 			accumulatorsHeaders,
 			accMsgParams);
 
-		return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
-			try {
-				return AccumulatorHelper.deserializeAccumulators(
-					accumulatorsInfo.getSerializedUserAccumulators(),
-					loader);
-			} catch (Exception e) {
-				throw new CompletionException(
-					new FlinkException(
-						String.format("Deserialization of accumulators for job %s failed.", jobID),
-						e));
-			}
-		});
+		return responseFuture
+			.thenApply(JobAccumulatorsInfo::getSerializedUserAccumulators)
+			.thenApply(accumulators -> {
+				try {
+					return AccumulatorHelper.deserializeAndUnwrapAccumulators(accumulators, loader);
+				} catch (Exception e) {
+					throw new CompletionException("Cannot deserialize and unwrap accumulators properly.", e);
+				}
+			});
 	}
 
 	private CompletableFuture<SavepointInfo> pollSavepointAsync(
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index a711c55..f3be219 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.function.TriFunction;
 
 import javax.annotation.Nonnull;
@@ -109,7 +108,7 @@ public class TestingClusterClient<T> implements ClusterClient<T> {
 	}
 
 	@Override
-	public CompletableFuture<Map<String, OptionalFailure<Object>>> getAccumulators(JobID jobID, ClassLoader loader) {
+	public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
 		throw new UnsupportedOperationException();
 	}
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 16599c5..d3a381c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -131,6 +131,7 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -512,21 +513,15 @@ public class RestClusterClientTest extends TestLogger {
 		TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
 
 		try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(accumulatorHandler)){
-			RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort());
 
-			try {
+			try (RestClusterClient<?> restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 				JobID id = new JobID();
+				Map<String, Object> accumulators = restClusterClient.getAccumulators(id).get();
+				assertNotNull(accumulators);
+				assertEquals(1, accumulators.size());
 
-				{
-					Map<String, OptionalFailure<Object>> accumulators = restClusterClient.getAccumulators(id).get();
-					assertNotNull(accumulators);
-					assertEquals(1, accumulators.size());
-
-					assertEquals(true, accumulators.containsKey("testKey"));
-					assertEquals("testValue", accumulators.get("testKey").get().toString());
-				}
-			} finally {
-				restClusterClient.close();
+				assertTrue(accumulators.containsKey("testKey"));
+				assertEquals("testValue", accumulators.get("testKey").toString());
 			}
 		}
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9bc1299..b6d3971 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -178,8 +178,6 @@ public class AccumulatorHelper {
 	 * @param serializedAccumulators The serialized accumulator results.
 	 * @param loader The class loader to use.
 	 * @return The deserialized accumulator results.
-	 * @throws IOException
-	 * @throws ClassNotFoundException
 	 */
 	public static Map<String, OptionalFailure<Object>> deserializeAccumulators(
 			Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
@@ -203,4 +201,30 @@ public class AccumulatorHelper {
 
 		return accumulators;
 	}
+
+	/**
+	 * Takes the serialized accumulator results and tries to deserialize them using the provided
+	 * class loader, and then try to unwrap the value unchecked.
+	 * @param serializedAccumulators The serialized accumulator results.
+	 * @param loader The class loader to use.
+	 * @return The deserialized and unwrapped accumulator results.
+	 */
+	public static Map<String, Object> deserializeAndUnwrapAccumulators(
+		Map<String, SerializedValue<OptionalFailure<Object>>> serializedAccumulators,
+		ClassLoader loader) throws IOException, ClassNotFoundException {
+
+		Map<String, OptionalFailure<Object>> deserializedAccumulators = deserializeAccumulators(serializedAccumulators, loader);
+
+		if (deserializedAccumulators.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size());
+
+		for (Map.Entry<String, OptionalFailure<Object>> entry : deserializedAccumulators.entrySet()) {
+			accumulators.put(entry.getKey(), entry.getValue().getUnchecked());
+		}
+
+		return accumulators;
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index d514bc4..5bec503 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -65,6 +66,12 @@ public interface JobClient extends AutoCloseable {
 	CompletableFuture<String> triggerSavepoint(@Nullable String savepointDirectory);
 
 	/**
+	 * Requests the accumulators of the associated job. Accumulators can be requested while it is running
+	 * or after it has finished. The class loader is used to deserialize the incoming accumulator results.
+	 */
+	CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader);
+
+	/**
 	 * Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
 	 *
 	 * @param userClassloader the classloader used to de-serialize the accumulators of the job.
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index d1725a5..b5c9873 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient {
 		return CompletableFuture.completedFuture("null");
 	}
 
+	@Override
+	public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+		return CompletableFuture.completedFuture(Collections.emptyMap());
+	}
+
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index ede0e19..bcc02ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.execution.JobClient;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -57,4 +58,9 @@ public class TestingJobClient implements JobClient {
 		return CompletableFuture.completedFuture("null");
 	}
 
+	@Override
+	public CompletableFuture<Map<String, Object>> getAccumulators(ClassLoader classLoader) {
+		return CompletableFuture.completedFuture(Collections.emptyMap());
+	}
+
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 20195d5..32e41e7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -169,7 +169,7 @@ public class AccumulatorLiveITCase extends TestLogger {
 				deadline,
 				accumulators -> accumulators.size() == 1
 					&& accumulators.containsKey(ACCUMULATOR_NAME)
-					&& (int) accumulators.get(ACCUMULATOR_NAME).getUnchecked() == NUM_ITERATIONS,
+					&& (int) accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS,
 				TestingUtils.defaultScheduledExecutor()
 			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
index 3c2446f..2436cb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
@@ -38,7 +38,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
-import org.apache.flink.util.OptionalFailure;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.BeforeClass;
@@ -58,8 +57,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static junit.framework.Assert.fail;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Test savepoint migration.
@@ -146,21 +145,17 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 		boolean done = false;
 		while (deadLine.hasTimeLeft()) {
 			Thread.sleep(100);
-			Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
+			Map<String, Object> accumulators = client.getAccumulators(jobSubmissionResult.getJobID()).get();
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
-				OptionalFailure<Object> accumOpt = accumulators.get(acc.f0);
+				Object accumOpt = accumulators.get(acc.f0);
 				if (accumOpt == null) {
 					allDone = false;
 					break;
 				}
 
-				Integer numFinished = (Integer) accumOpt.get();
-				if (numFinished == null) {
-					allDone = false;
-					break;
-				}
+				Integer numFinished = (Integer) accumOpt;
 				if (!numFinished.equals(acc.f1)) {
 					allDone = false;
 					break;
@@ -226,16 +221,16 @@ public abstract class SavepointMigrationTestBase extends TestBaseUtils {
 			}
 
 			Thread.sleep(100);
-			Map<String, OptionalFailure<Object>> accumulators = client.getAccumulators(jobId).get();
+			Map<String, Object> accumulators = client.getAccumulators(jobId).get();
 
 			boolean allDone = true;
 			for (Tuple2<String, Integer> acc : expectedAccumulators) {
-				OptionalFailure<Object> numFinished = accumulators.get(acc.f0);
+				Object numFinished = accumulators.get(acc.f0);
 				if (numFinished == null) {
 					allDone = false;
 					break;
 				}
-				if (!numFinished.get().equals(acc.f1)) {
+				if (!numFinished.equals(acc.f1)) {
 					allDone = false;
 					break;
 				}