You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/03 13:47:27 UTC
flink git commit: [FLINK-7012] remove user-JAR upload when disposing
a savepoint the old way
Repository: flink
Updated Branches:
refs/heads/master d414fe101 -> 236a34523
[FLINK-7012] remove user-JAR upload when disposing a savepoint the old way
This closes #4245.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/236a3452
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/236a3452
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/236a3452
Branch: refs/heads/master
Commit: 236a345236b97a185393ba49875e8691298f5e26
Parents: d414fe1
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jun 27 12:14:08 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jul 3 14:30:16 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 32 +-------------------
.../test/classloading/ClassLoaderITCase.java | 8 -----
2 files changed, 1 insertion(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/236a3452/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a22cb37..5739cdd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -45,7 +45,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -54,8 +53,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -757,36 +754,9 @@ public class CliFrontend {
"Usage: bin/flink savepoint -d <savepoint-path>");
}
- String jarFile = options.getJarFilePath();
-
ActorGateway jobManager = getJobManagerGateway(options);
- List<BlobKey> blobKeys = null;
- if (jarFile != null) {
- logAndSysout("Disposing savepoint '" + savepointPath + "' with JAR " + jarFile + ".");
-
- List<File> libs = null;
- try {
- libs = PackagedProgram.extractContainedLibraries(new File(jarFile).toURI().toURL());
- if (!libs.isEmpty()) {
- List<Path> libPaths = new ArrayList<>(libs.size());
- for (File f : libs) {
- libPaths.add(new Path(f.toURI()));
- }
-
- logAndSysout("Uploading JAR files.");
- LOG.debug("JAR files: " + libPaths);
- blobKeys = BlobClient.uploadJarFiles(jobManager, clientTimeout, config, libPaths);
- LOG.debug("Blob keys: " + blobKeys.toString());
- }
- } finally {
- if (libs != null) {
- PackagedProgram.deleteExtractedLibraries(libs);
- }
- }
- } else {
- logAndSysout("Disposing savepoint '" + savepointPath + "'.");
- }
+ logAndSysout("Disposing savepoint '" + savepointPath + "'.");
Object msg = new DisposeSavepoint(savepointPath);
Future<Object> response = jobManager.ask(msg, clientTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/236a3452/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 75eb112..b1915bd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -25,8 +25,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -59,7 +57,6 @@ import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@@ -341,11 +338,6 @@ public class ClassLoaderITCase extends TestLogger {
assertNotNull("Failed to trigger savepoint", savepointPath);
- // Upload JAR
- LOG.info("Uploading JAR " + CUSTOM_KV_STATE_JAR_PATH + " for savepoint disposal.");
- List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jm, deadline.timeLeft(), testCluster.userConfiguration(),
- Collections.singletonList(new Path(CUSTOM_KV_STATE_JAR_PATH)));
-
// Dispose savepoint
LOG.info("Disposing savepoint at " + savepointPath);
Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());