You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/05/04 07:58:22 UTC
[2/2] flink git commit: [FLINK-8620] Enable shipping custom files to
BlobStore and accessing them through DistributedCache
[FLINK-8620] Enable shipping custom files to BlobStore and accessing them through DistributedCache
This closes #5580
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0146f8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0146f8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0146f8a
Branch: refs/heads/master
Commit: e0146f8ac525e5e3e9e49fb590d5c149773efc9f
Parents: 2cfd89c
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Wed Feb 7 16:21:42 2018 +0100
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Fri May 4 09:56:44 2018 +0200
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 9 +-
.../api/common/cache/DistributedCache.java | 53 +++-
.../java/org/apache/flink/util/FileUtils.java | 38 +++
.../pom.xml | 85 ++++++
.../DistributedCacheViaBlobTestProgram.java | 77 +++++
flink-end-to-end-tests/pom.xml | 1 +
flink-end-to-end-tests/test-scripts/common.sh | 2 +
...test_streaming_distributed_cache_via_blob.sh | 40 +++
.../flink/python/api/PythonPlanBinder.java | 12 +-
.../python/api/PythonStreamBinder.java | 15 +-
.../environment/PythonEnvironmentFactory.java | 16 +-
.../PythonStreamExecutionEnvironment.java | 36 +--
.../plantranslate/JobGraphGenerator.java | 15 +-
.../webmonitor/handlers/JarRunHandler.java | 4 +-
.../apache/flink/runtime/blob/BlobClient.java | 236 +++++-----------
.../flink/runtime/blob/BlobOutputStream.java | 177 ++++++++++++
.../apache/flink/runtime/blob/BlobUtils.java | 23 ++
.../apache/flink/runtime/client/JobClient.java | 7 +
.../client/JobSubmissionClientActor.java | 10 +
.../flink/runtime/filecache/FileCache.java | 282 +++++++++----------
.../apache/flink/runtime/jobgraph/JobGraph.java | 55 +++-
.../flink/runtime/minicluster/MiniCluster.java | 16 +-
.../runtime/taskexecutor/TaskExecutor.java | 8 +-
.../taskexecutor/TaskManagerServices.java | 17 --
.../apache/flink/runtime/taskmanager/Task.java | 29 +-
.../flink/runtime/taskmanager/TaskManager.scala | 32 ++-
.../flink/runtime/blob/BlobCachePutTest.java | 2 +-
.../flink/runtime/blob/BlobClientTest.java | 52 +++-
.../flink/runtime/blob/BlobServerPutTest.java | 2 +-
.../FileCacheDeleteValidationTest.java | 145 ----------
.../filecache/FileCacheDirectoriesTest.java | 209 ++++++++++++++
.../filecache/FileCacheReadsFromBlobTest.java | 120 ++++++++
.../jobmanager/JobManagerCleanupITCase.java | 6 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 2 +-
.../TaskManagerServicesBuilder.java | 9 -
.../runtime/util/JvmExitOnFatalErrorTest.java | 6 +-
.../api/graph/StreamingJobGraphGenerator.java | 2 +-
.../tasks/InterruptSensitiveRestoreTest.java | 3 +-
.../tasks/TaskCheckpointingBehaviourTest.java | 3 +-
.../streaming/util/TestStreamEnvironment.java | 8 +-
40 files changed, 1224 insertions(+), 640 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 5c04be7..0921642 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -319,18 +319,19 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine(
getDispatcherAddress(),
(BlobServerPortResponseBody response, String dispatcherAddress) -> {
- log.info("Uploading jar files.");
final int blobServerPort = response.port;
final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort);
final List<PermanentBlobKey> keys;
try {
- keys = BlobClient.uploadJarFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+ log.info("Uploading jar files.");
+ keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
+ jobGraph.uploadUserArtifacts(address, flinkConfig);
} catch (IOException ioe) {
- throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
+ throw new CompletionException(new FlinkException("Could not upload job files.", ioe));
}
for (PermanentBlobKey key : keys) {
- jobGraph.addBlob(key);
+ jobGraph.addUserJarBlobKey(key);
}
return jobGraph;
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
index 35a82e8..9cceb83 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java
@@ -19,6 +19,8 @@ package org.apache.flink.api.common.cache;
import java.io.File;
+import java.io.Serializable;
+import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -37,18 +39,31 @@ import org.apache.flink.core.fs.Path;
*/
@Public
public class DistributedCache {
-
- public static class DistributedCacheEntry {
-
+
+ public static class DistributedCacheEntry implements Serializable {
+
public String filePath;
public Boolean isExecutable;
-
- public DistributedCacheEntry(String filePath, Boolean isExecutable){
+ public boolean isZipped;
+
+ public byte[] blobKey;
+
+ public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey, boolean isZipped){
this.filePath=filePath;
this.isExecutable=isExecutable;
+ this.blobKey = blobKey;
+ this.isZipped = isZipped;
+ }
+
+ public DistributedCacheEntry(String filePath, Boolean isExecutable){
+ this(filePath, isExecutable, null);
+ }
+
+ public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobKey){
+ this(filePath, isExecutable, blobKey, false);
}
}
-
+
// ------------------------------------------------------------------------
private final Map<String, Future<Path>> cacheCopyTasks;
@@ -63,16 +78,17 @@ public class DistributedCache {
if (name == null) {
throw new NullPointerException("name must not be null");
}
-
+
Future<Path> future = cacheCopyTasks.get(name);
if (future == null) {
throw new IllegalArgumentException("File with name '" + name + "' is not available." +
" Did you forget to register the file?");
}
-
+
try {
- Path tmp = future.get();
- return new File(tmp.toString());
+ final Path path = future.get();
+ URI tmp = path.makeQualified(path.getFileSystem()).toUri();
+ return new File(tmp);
}
catch (ExecutionException e) {
throw new RuntimeException("An error occurred while copying the file.", e.getCause());
@@ -82,17 +98,19 @@ public class DistributedCache {
"' from the distributed cache", e);
}
}
-
+
// ------------------------------------------------------------------------
// Utilities to read/write cache files from/to the configuration
// ------------------------------------------------------------------------
-
+
public static void writeFileInfoToConfig(String name, DistributedCacheEntry e, Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM,0) + 1;
conf.setInteger(CACHE_FILE_NUM, num);
conf.setString(CACHE_FILE_NAME + num, name);
conf.setString(CACHE_FILE_PATH + num, e.filePath);
conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
+ conf.setBoolean(CACHE_FILE_DIR + num, e.isZipped || new File(e.filePath).isDirectory());
+ conf.setBytes(CACHE_FILE_BLOB_KEY + num, e.blobKey);
}
public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(Configuration conf) {
@@ -105,8 +123,11 @@ public class DistributedCache {
for (int i = 1; i <= num; i++) {
String name = conf.getString(CACHE_FILE_NAME + i, null);
String filePath = conf.getString(CACHE_FILE_PATH + i, null);
- Boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
- cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable));
+ boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
+ boolean isDirectory = conf.getBoolean(CACHE_FILE_DIR + i, false);
+
+ byte[] blobKey = conf.getBytes(CACHE_FILE_BLOB_KEY + i, null);
+ cacheFiles.put(name, new DistributedCacheEntry(filePath, isExecutable, blobKey, isDirectory));
}
return cacheFiles.entrySet();
}
@@ -118,4 +139,8 @@ public class DistributedCache {
private static final String CACHE_FILE_PATH = "DISTRIBUTED_CACHE_FILE_PATH_";
private static final String CACHE_FILE_EXE = "DISTRIBUTED_CACHE_FILE_EXE_";
+
+ private static final String CACHE_FILE_DIR = "DISTRIBUTED_CACHE_FILE_DIR_";
+
+ private static final String CACHE_FILE_BLOB_KEY = "DISTRIBUTED_CACHE_FILE_BLOB_KEY_";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
index f47825c..46c5621 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java
@@ -18,6 +18,8 @@
package org.apache.flink.util;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -315,6 +317,42 @@ public final class FileUtils {
}
}
+ /**
+ * Copies all files from source to target and sets executable flag. Paths might be on different systems.
+ * @param sourcePath source path to copy from
+ * @param targetPath target path to copy to
+ * @param executable if target file should be executable
+ * @throws IOException if the copy fails
+ */
+ public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
+ // we unwrap the file system to get raw streams without safety net
+ FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
+ FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
+ if (!tFS.exists(targetPath)) {
+ if (sFS.getFileStatus(sourcePath).isDir()) {
+ tFS.mkdirs(targetPath);
+ FileStatus[] contents = sFS.listStatus(sourcePath);
+ for (FileStatus content : contents) {
+ String distPath = content.getPath().toString();
+ if (content.isDir()) {
+ if (distPath.endsWith("/")) {
+ distPath = distPath.substring(0, distPath.length() - 1);
+ }
+ }
+ String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+ copy(content.getPath(), new Path(localPath), executable);
+ }
+ } else {
+ try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
+ IOUtils.copyBytes(fsInput, lfsOutput);
+ //noinspection ResultOfMethodCallIgnored
+ new File(targetPath.toString()).setExecutable(executable);
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml
new file mode 100644
index 0000000..046d58d
--- /dev/null
+++ b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-distributed-cache-via-blob-test_${scala.binary.version}</artifactId>
+ <name>flink-distributed-cache-via-blob</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+
+ <executions>
+ <!-- ClassLoaderTestProgram -->
+ <execution>
+ <id>DistributedCacheViaBlobTestProgram</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <finalName>DistributedCacheViaBlobTestProgram</finalName>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.tests.DistributedCacheViaBlobTestProgram</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram*</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
+
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
new file mode 100644
index 0000000..9ad4b98
--- /dev/null
+++ b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+/**
+ * End-to-end test program for verifying that files are distributed via BlobServer and later accessible through
+ * DistribitutedCache. We verify that via uploading file and later on accessing it in map function. To be sure we read
+ * version read from cache, we delete the initial file.
+ */
+public class DistributedCacheViaBlobTestProgram {
+
+ public static void main(String[] args) throws Exception {
+
+ final ParameterTool params = ParameterTool.fromArgs(args);
+
+ final Path inputFile = Paths.get(params.getRequired("inputFile"));
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ env.registerCachedFile(inputFile.toString(), "test_data", false);
+
+ env.fromElements(1)
+ .map(new TestMapFunction(inputFile.toAbsolutePath().toString()))
+ .writeAsText(params.getRequired("output"), FileSystem.WriteMode.OVERWRITE);
+
+ env.execute("Distributed Cache Via Blob Test Program");
+ }
+
+ static class TestMapFunction extends RichMapFunction<Integer, String> {
+
+ private String initialPath;
+
+ public TestMapFunction(String initialPath) {
+ this.initialPath = initialPath;
+ }
+
+ @Override
+ public String map(Integer value) throws Exception {
+ final Path testFile = getRuntimeContext().getDistributedCache().getFile("test_data").toPath();
+
+ if (testFile.toAbsolutePath().toString().equals(initialPath)) {
+ throw new RuntimeException(String.format("Operator should access copy from cache rather than the " +
+ "initial file. Input file path: %s. Cache file path: %s", initialPath, testFile));
+ }
+
+ return Files.readAllLines(testFile)
+ .stream()
+ .collect(Collectors.joining("\n"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 34586ed..6a03998 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -40,6 +40,7 @@ under the License.
<module>flink-datastream-allround-test</module>
<module>flink-stream-sql-test</module>
<module>flink-bucketing-sink-test</module>
+ <module>flink-distributed-cache-via-blob-test</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 73885e1..d8e0de7 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -194,6 +194,8 @@ function stop_cluster {
| grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration" \
| grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \
| grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \
+ | grep -v "java.lang.Exception: Execution was suspended" \
+ | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
| grep -iq "exception"; then
echo "Found exception in log files:"
cat $FLINK_DIR/log/*
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh b/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
new file mode 100755
index 0000000..b85f610
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_distributed_cache_via_blob.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-distributed-cache-via-blob-test/target/DistributedCacheViaBlobTestProgram.jar
+
+echo "Testing distributing files via DistributedCache & BlobServer"
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR --inputFile $TEST_INFRA_DIR/test-data/words --tempDir $TEST_DATA_DIR/ --output $TEST_DATA_DIR/out/cl_out_pf
+
+OUTPUT=`cat $TEST_DATA_DIR/out/cl_out_pf`
+
+EXPECTED="Hello World how are you, my dear dear world"
+if [[ "$OUTPUT" != "$EXPECTED" ]]; then
+ echo "Output from Flink program does not match expected output."
+ echo -e "EXPECTED: $EXPECTED"
+ echo -e "ACTUAL: $OUTPUT"
+ PASS=""
+fi
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 106ac7a..a4f38fe 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -45,7 +45,7 @@ import org.apache.flink.python.api.functions.util.StringDeserializerMap;
import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
import org.apache.flink.python.api.util.SetCache;
-import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
@@ -182,10 +182,7 @@ public class PythonPlanBinder {
receivePlan(env);
- // upload files to remote FS and register on Distributed Cache
- deleteIfExists(tmpDistributedDir);
- FileCache.copy(tmpPlanFilesPath, tmpDistributedDir, true);
- env.registerCachedFile(tmpDistributedDir.toUri().toString(), FLINK_PYTHON_DC_ID);
+ env.registerCachedFile(tmpPlanFilesPath.toUri().toString(), FLINK_PYTHON_DC_ID, true);
JobExecutionResult jer = env.execute();
long runtime = jer.getNetRuntime();
@@ -197,9 +194,6 @@ public class PythonPlanBinder {
} finally {
try {
// clean up created files
- FileSystem distributedFS = tmpDistributedDir.getFileSystem();
- distributedFS.delete(tmpDistributedDir, true);
-
FileSystem local = FileSystem.getLocalFileSystem();
local.delete(new Path(tmpPlanFilesDir), true);
} catch (IOException ioe) {
@@ -252,7 +246,7 @@ public class PythonPlanBinder {
private static void copyFile(Path source, Path targetDirectory, String name) throws IOException {
Path targetFilePath = new Path(targetDirectory, name);
deleteIfExists(targetFilePath);
- FileCache.copy(source, targetFilePath, true);
+ FileUtils.copy(source, targetFilePath, true);
}
//====Plan==========================================================================================================
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
index 97b1030..0742eac 100644
--- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
+++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/PythonStreamBinder.java
@@ -22,10 +22,10 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.streaming.python.PythonOptions;
import org.apache.flink.streaming.python.api.environment.PythonEnvironmentFactory;
import org.apache.flink.streaming.python.util.InterpreterUtils;
+import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,23 +110,19 @@ public class PythonStreamBinder {
targetDir.getFileSystem().mkdirs(targetDir);
// copy user files to temporary location
- Path tmpPlanFilesPath = new Path(localTmpPath);
- copyFile(planPath, tmpPlanFilesPath, planPath.getName());
+ copyFile(planPath, targetDir, planPath.getName());
for (String file : filesToCopy) {
Path source = new Path(file);
- copyFile(source, tmpPlanFilesPath, source.getName());
+ copyFile(source, targetDir, source.getName());
}
String planNameWithExtension = planPath.getName();
String planName = planNameWithExtension.substring(0, planNameWithExtension.indexOf(".py"));
- InterpreterUtils.initAndExecPythonScript(new PythonEnvironmentFactory(localTmpPath, tmpDistributedDir, planName), Paths.get(localTmpPath), planName, planArgumentsArray);
+ InterpreterUtils.initAndExecPythonScript(new PythonEnvironmentFactory(localTmpPath, planName), Paths.get(localTmpPath), planName, planArgumentsArray);
} finally {
try {
// clean up created files
- FileSystem distributedFS = tmpDistributedDir.getFileSystem();
- distributedFS.delete(tmpDistributedDir, true);
-
FileSystem local = FileSystem.getLocalFileSystem();
local.delete(new Path(localTmpPath), true);
} catch (IOException ioe) {
@@ -147,6 +143,7 @@ public class PythonStreamBinder {
private static void copyFile(Path source, Path targetDirectory, String name) throws IOException {
Path targetFilePath = new Path(targetDirectory, name);
deleteIfExists(targetFilePath);
- FileCache.copy(source, targetFilePath, true);
+ FileUtils.copy(source, targetFilePath, true);
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
index 8e437fe..b9b9f8c 100644
--- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
+++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonEnvironmentFactory.java
@@ -33,12 +33,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
*/
public class PythonEnvironmentFactory {
private final String localTmpPath;
- private final Path tmpDistributedDir;
private final String scriptName;
- public PythonEnvironmentFactory(String localTmpPath, Path tmpDistributedDir, String scriptName) {
+ public PythonEnvironmentFactory(String localTmpPath, String scriptName) {
this.localTmpPath = localTmpPath;
- this.tmpDistributedDir = tmpDistributedDir;
this.scriptName = scriptName;
}
@@ -50,7 +48,7 @@ public class PythonEnvironmentFactory {
* executed.
*/
public PythonStreamExecutionEnvironment get_execution_environment() {
- return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment(), new Path(localTmpPath), tmpDistributedDir, scriptName);
+ return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment(), new Path(localTmpPath), scriptName);
}
/**
@@ -64,7 +62,7 @@ public class PythonEnvironmentFactory {
* @return A local execution environment with the specified parallelism.
*/
public PythonStreamExecutionEnvironment create_local_execution_environment(Configuration config) {
- return new PythonStreamExecutionEnvironment(new LegacyLocalStreamEnvironment(config), new Path(localTmpPath), tmpDistributedDir, scriptName);
+ return new PythonStreamExecutionEnvironment(new LegacyLocalStreamEnvironment(config), new Path(localTmpPath), scriptName);
}
/**
@@ -76,7 +74,7 @@ public class PythonEnvironmentFactory {
*/
public PythonStreamExecutionEnvironment create_local_execution_environment(int parallelism, Configuration config) {
return new PythonStreamExecutionEnvironment(
- StreamExecutionEnvironment.createLocalEnvironment(parallelism, config), new Path(localTmpPath), tmpDistributedDir, scriptName);
+ StreamExecutionEnvironment.createLocalEnvironment(parallelism, config), new Path(localTmpPath), scriptName);
}
/**
@@ -95,7 +93,7 @@ public class PythonEnvironmentFactory {
public PythonStreamExecutionEnvironment create_remote_execution_environment(
String host, int port, String... jar_files) {
return new PythonStreamExecutionEnvironment(
- StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files), new Path(localTmpPath), tmpDistributedDir, scriptName);
+ StreamExecutionEnvironment.createRemoteEnvironment(host, port, jar_files), new Path(localTmpPath), scriptName);
}
/**
@@ -116,7 +114,7 @@ public class PythonEnvironmentFactory {
public PythonStreamExecutionEnvironment create_remote_execution_environment(
String host, int port, Configuration config, String... jar_files) {
return new PythonStreamExecutionEnvironment(
- StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files), new Path(localTmpPath), tmpDistributedDir, scriptName);
+ StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jar_files), new Path(localTmpPath), scriptName);
}
/**
@@ -137,6 +135,6 @@ public class PythonEnvironmentFactory {
public PythonStreamExecutionEnvironment create_remote_execution_environment(
String host, int port, int parallelism, String... jar_files) {
return new PythonStreamExecutionEnvironment(
- StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files), new Path(localTmpPath), tmpDistributedDir, scriptName);
+ StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jar_files), new Path(localTmpPath), scriptName);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
index 0684e35..e49c617 100644
--- a/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
+++ b/flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java
@@ -21,17 +21,11 @@ package org.apache.flink.streaming.python.api.environment;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.python.PythonOptions;
import org.apache.flink.streaming.python.api.datastream.PythonDataStream;
import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction;
import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction;
@@ -61,7 +55,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.UUID;
/**
* A thin wrapper layer over {@link StreamExecutionEnvironment}.
@@ -78,12 +71,10 @@ public class PythonStreamExecutionEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class);
private final StreamExecutionEnvironment env;
private final Path pythonTmpCachePath;
- private final Path tmpDistributedDir;
- PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path tmpLocalDir, Path tmpDistributedDir, String scriptName) {
+ PythonStreamExecutionEnvironment(StreamExecutionEnvironment env, Path tmpLocalDir, String scriptName) {
this.env = env;
this.pythonTmpCachePath = tmpLocalDir;
- this.tmpDistributedDir = tmpDistributedDir;
env.getConfig().setGlobalJobParameters(new PythonJobParameters(scriptName));
registerJythonSerializers(this.env);
}
@@ -252,7 +243,6 @@ public class PythonStreamExecutionEnvironment {
public JobExecutionResult execute() throws Exception {
distributeFiles();
JobExecutionResult result = this.env.execute();
- cleanupDistributedFiles();
return result;
}
@@ -265,33 +255,11 @@ public class PythonStreamExecutionEnvironment {
public JobExecutionResult execute(String job_name) throws Exception {
distributeFiles();
JobExecutionResult result = this.env.execute(job_name);
- cleanupDistributedFiles();
return result;
}
private void distributeFiles() throws IOException {
- Path remoteRootDir;
- if (this.env instanceof LocalStreamEnvironment) {
- remoteRootDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue());
- } else {
- remoteRootDir = tmpDistributedDir;
- }
- Path remoteDir = new Path(remoteRootDir, "flink_cache_" + UUID.randomUUID());
-
- FileCache.copy(pythonTmpCachePath, remoteDir, true);
-
- this.env.registerCachedFile(remoteDir.toUri().toString(), PythonConstants.FLINK_PYTHON_DC_ID);
+ this.env.registerCachedFile(pythonTmpCachePath.getPath(), PythonConstants.FLINK_PYTHON_DC_ID);
}
- private void cleanupDistributedFiles() throws IOException {
- for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : this.env.getCachedFiles()) {
- Path fileUri = new Path(e.f1.filePath);
- FileSystem fs = fileUri.getFileSystem();
- LOG.debug(String.format("Cleaning up cached path: %s, uriPath: %s, fileSystem: %s",
- e.f1.filePath,
- fileUri.getPath(),
- fs.getClass().getName()));
- fs.delete(new Path(fileUri.getPath()), true);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 846e7f4..0c0c3f3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -18,19 +18,18 @@
package org.apache.flink.optimizer.plantranslate;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.AlgorithmOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.TempMode;
@@ -49,8 +48,6 @@ import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.util.Utils;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -59,11 +56,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;
-import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
+import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.BatchTask;
@@ -84,6 +81,8 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -246,7 +245,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// add registered cache file into job configuration
for (Entry<String, DistributedCacheEntry> e : program.getOriginalPlan().getCachedFiles()) {
- DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
+ graph.addUserArtifact(e.getKey(), e.getValue());
}
// release all references again
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 2e928b0..0605bf1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -116,13 +116,13 @@ public class JarRunHandler extends
final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
final List<PermanentBlobKey> keys;
try {
- keys = BlobClient.uploadJarFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
+ keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars());
} catch (IOException ioe) {
throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
}
for (PermanentBlobKey key : keys) {
- jobGraph.addBlob(key);
+ jobGraph.addUserJarBlobKey(key);
}
return jobGraph;
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 8e6b328..1301740 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -22,10 +22,11 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,23 +46,20 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.security.MessageDigest;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobUtils.readFully;
-import static org.apache.flink.runtime.blob.BlobUtils.readLength;
-import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+import static org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -348,38 +346,11 @@ public final class BlobClient implements Closeable {
LOG.debug("PUT BLOB buffer (" + len + " bytes) to " + socket.getLocalSocketAddress() + ".");
}
- try {
- final OutputStream os = this.socket.getOutputStream();
- final MessageDigest md = BlobUtils.createMessageDigest();
-
- // Send the PUT header
- sendPutHeader(os, jobId, blobType);
-
- // Send the value in iterations of BUFFER_SIZE
- int remainingBytes = len;
-
- while (remainingBytes > 0) {
- // want a common code path for byte[] and InputStream at the BlobServer
- // -> since for InputStream we don't know a total size beforehand, send lengths iteratively
- final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
- writeLength(bytesToSend, os);
-
- os.write(value, offset, bytesToSend);
-
- // Update the message digest
- md.update(value, offset, bytesToSend);
-
- remainingBytes -= bytesToSend;
- offset += bytesToSend;
- }
- // send -1 as the stream end
- writeLength(-1, os);
-
+ try (BlobOutputStream os = new BlobOutputStream(jobId, blobType, socket)) {
+ os.write(value, offset, len);
// Receive blob key and compare
- final InputStream is = this.socket.getInputStream();
- return receiveAndCheckPutResponse(is, md, blobType);
- }
- catch (Throwable t) {
+ return os.finish();
+ } catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
throw new IOException("PUT operation failed: " + t.getMessage(), t);
}
@@ -413,111 +384,16 @@ public final class BlobClient implements Closeable {
LOG.debug("PUT BLOB stream to {}.", socket.getLocalSocketAddress());
}
- try {
- final OutputStream os = this.socket.getOutputStream();
- final MessageDigest md = BlobUtils.createMessageDigest();
- final byte[] xferBuf = new byte[BUFFER_SIZE];
-
- // Send the PUT header
- sendPutHeader(os, jobId, blobType);
-
- while (true) {
- // since we don't know a total size here, send lengths iteratively
- final int read = inputStream.read(xferBuf);
- if (read < 0) {
- // we are done. send a -1 and be done
- writeLength(-1, os);
- break;
- }
- if (read > 0) {
- writeLength(read, os);
- os.write(xferBuf, 0, read);
- md.update(xferBuf, 0, read);
- }
- }
-
- // Receive blob key and compare
- final InputStream is = this.socket.getInputStream();
- return receiveAndCheckPutResponse(is, md, blobType);
- }
- catch (Throwable t) {
+ try (BlobOutputStream os = new BlobOutputStream(jobId, blobType, socket)) {
+ IOUtils.copyBytes(inputStream, os, BUFFER_SIZE, false);
+ return os.finish();
+ } catch (Throwable t) {
BlobUtils.closeSilently(socket, LOG);
throw new IOException("PUT operation failed: " + t.getMessage(), t);
}
}
/**
- * Constructs and writes the header data for a PUT request to the given output stream.
- *
- * @param outputStream
- * the output stream to write the PUT header data to
- * @param jobId
- * the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
- * @param blobType
- * whether the BLOB should become permanent or transient
- *
- * @throws IOException
- * thrown if an I/O error occurs while writing the header data to the output stream
- */
- private static void sendPutHeader(
- OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType)
- throws IOException {
- // Signal type of operation
- outputStream.write(PUT_OPERATION);
- if (jobId == null) {
- outputStream.write(JOB_UNRELATED_CONTENT);
- } else {
- outputStream.write(JOB_RELATED_CONTENT);
- outputStream.write(jobId.getBytes());
- }
- outputStream.write(blobType.ordinal());
- }
-
- /**
- * Reads the response from the input stream and throws in case of errors.
- *
- * @param is
- * stream to read from
- * @param md
- * message digest to check the response against
- * @param blobType
- * whether the BLOB should be permanent or transient
- *
- * @throws IOException
- * if the response is an error, the message digest does not match or reading the response
- * failed
- */
- private static BlobKey receiveAndCheckPutResponse(
- InputStream is, MessageDigest md, BlobKey.BlobType blobType)
- throws IOException {
- int response = is.read();
- if (response < 0) {
- throw new EOFException("Premature end of response");
- }
- else if (response == RETURN_OKAY) {
-
- BlobKey remoteKey = BlobKey.readFromInputStream(is);
- byte[] localHash = md.digest();
-
- if (blobType != remoteKey.getType()) {
- throw new IOException("Detected data corruption during transfer");
- }
- if (!Arrays.equals(localHash, remoteKey.getHash())) {
- throw new IOException("Detected data corruption during transfer");
- }
-
- return remoteKey;
- }
- else if (response == RETURN_ERROR) {
- Throwable cause = readExceptionFromStream(is);
- throw new IOException("Server side error: " + cause.getMessage(), cause);
- }
- else {
- throw new IOException("Unrecognized response: " + response + '.');
- }
- }
-
- /**
* Uploads the JAR files to the {@link PermanentBlobService} of the {@link BlobServer} at the
* given address with HA as configured.
*
@@ -527,37 +403,27 @@ public final class BlobClient implements Closeable {
* Any additional configuration for the blob client
* @param jobId
* ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
- * @param jars
- * List of JAR files to upload
+ * @param files
+ * List of files to upload
*
* @throws IOException
* if the upload fails
*/
- public static List<PermanentBlobKey> uploadJarFiles(
- InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> jars)
+ public static List<PermanentBlobKey> uploadFiles(
+ InetSocketAddress serverAddress, Configuration clientConfig, JobID jobId, List<Path> files)
throws IOException {
checkNotNull(jobId);
- if (jars.isEmpty()) {
+ if (files.isEmpty()) {
return Collections.emptyList();
} else {
List<PermanentBlobKey> blobKeys = new ArrayList<>();
try (BlobClient blobClient = new BlobClient(serverAddress, clientConfig)) {
- for (final Path jar : jars) {
- final FileSystem fs = jar.getFileSystem();
- FSDataInputStream is = null;
- try {
- is = fs.open(jar);
- final PermanentBlobKey key =
- (PermanentBlobKey) blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
- blobKeys.add(key);
- } finally {
- if (is != null) {
- is.close();
- }
- }
+ for (final Path file : files) {
+ final PermanentBlobKey key = blobClient.uploadFile(jobId, file);
+ blobKeys.add(key);
}
}
@@ -565,21 +431,55 @@ public final class BlobClient implements Closeable {
}
}
- // --------------------------------------------------------------------------------------------
- // Miscellaneous
- // --------------------------------------------------------------------------------------------
-
- private static Throwable readExceptionFromStream(InputStream in) throws IOException {
- int len = readLength(in);
- byte[] bytes = new byte[len];
- readFully(in, bytes, 0, len, "Error message");
+ /**
+ * Uploads a single file to the {@link PermanentBlobService} of the given {@link BlobServer}.
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
+ * @param file
+ * file to upload
+ *
+ * @throws IOException
+ * if the upload fails
+ */
+ public PermanentBlobKey uploadFile(JobID jobId, Path file) throws IOException {
+ final FileSystem fs = file.getFileSystem();
+ if (fs.getFileStatus(file).isDir()) {
+ return uploadDirectory(jobId, file, fs);
+ } else {
+ try (InputStream is = fs.open(file)) {
+ return (PermanentBlobKey) putInputStream(jobId, is, PERMANENT_BLOB);
+ }
+ }
+ }
- try {
- return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+ private PermanentBlobKey uploadDirectory(JobID jobId, Path file, FileSystem fs) throws IOException {
+ try (BlobOutputStream blobOutputStream = new BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+ try (ZipOutputStream zipStream = new ZipOutputStream(blobOutputStream)) {
+ compressDirectoryToZipfile(fs, fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+ zipStream.finish();
+ return (PermanentBlobKey) blobOutputStream.finish();
+ }
}
- catch (ClassNotFoundException e) {
- // should never occur
- throw new IOException("Could not transfer error message", e);
+ }
+
+ private static void compressDirectoryToZipfile(FileSystem fs, FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws IOException {
+ for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
+ LOG.info("Zipping file: {}", file);
+ if (file.isDir()) {
+ compressDirectoryToZipfile(fs, rootDir, file, out);
+ } else {
+ String entryName = file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
+ LOG.info("Zipping entry: {}, file: {}, rootDir: {}", entryName, file, rootDir);
+ ZipEntry entry = new ZipEntry(entryName);
+ out.putNextEntry(entry);
+
+ try (FSDataInputStream in = fs.open(file.getPath())) {
+ IOUtils.copyBytes(in, out, false);
+ }
+ out.closeEntry();
+ }
}
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java
new file mode 100644
index 0000000..923b5c4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobOutputStream.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.security.MessageDigest;
+import java.util.Arrays;
+
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CONTENT;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+
+/**
+ * The BLOB output stream is a special implementation of an {@link OutputStream} to send data vi PUT to the BLOB server.
+ */
+final class BlobOutputStream extends OutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlobOutputStream.class);
+
+ private final BlobKey.BlobType blobType;
+ private final OutputStream socketStream;
+ private final Socket socket;
+ private final MessageDigest md;
+
+ BlobOutputStream(JobID jobID, BlobKey.BlobType blobType, Socket socket) throws IOException {
+ this.blobType = blobType;
+
+ if (socket.isClosed()) {
+ throw new IllegalStateException("BLOB Client is not connected. " +
+ "Client has been shut down or encountered an error before.");
+ }
+
+ this.socket = socket;
+ this.socketStream = socket.getOutputStream();
+ this.md = BlobUtils.createMessageDigest();
+ sendPutHeader(socketStream, jobID, blobType);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ writeLength(1, socketStream);
+ socketStream.write(b);
+ md.update((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ // Send the value in iterations of BUFFER_SIZE
+ int remainingBytes = len;
+
+ while (remainingBytes > 0) {
+ // want a common code path for byte[] and InputStream at the BlobServer
+ // -> since for InputStream we don't know a total size beforehand, send lengths iteratively
+ final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes);
+ writeLength(bytesToSend, socketStream);
+
+ socketStream.write(b, off, bytesToSend);
+
+ // Update the message digest
+ md.update(b, off, bytesToSend);
+
+ remainingBytes -= bytesToSend;
+ off += bytesToSend;
+ }
+ }
+
+ public BlobKey finish() throws IOException {
+ // send -1 as the stream end
+ writeLength(-1, socketStream);
+
+ // Receive blob key and compare
+ final InputStream is = this.socket.getInputStream();
+ return receiveAndCheckPutResponse(is, md, blobType);
+ }
+
+ /**
+ * Constructs and writes the header data for a PUT request to the given output stream.
+ *
+ * @param outputStream
+ * the output stream to write the PUT header data to
+ * @param jobId
+ * the ID of job the BLOB belongs to (or <tt>null</tt> if job-unrelated)
+ * @param blobType
+ * whether the BLOB should become permanent or transient
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while writing the header data to the output stream
+ */
+ private static void sendPutHeader(
+ OutputStream outputStream, @Nullable JobID jobId, BlobKey.BlobType blobType)
+ throws IOException {
+ // Signal type of operation
+ outputStream.write(PUT_OPERATION);
+ if (jobId == null) {
+ outputStream.write(JOB_UNRELATED_CONTENT);
+ } else {
+ outputStream.write(JOB_RELATED_CONTENT);
+ outputStream.write(jobId.getBytes());
+ }
+ outputStream.write(blobType.ordinal());
+ }
+
+ /**
+ * Reads the response from the input stream and throws in case of errors.
+ *
+ * @param is
+ * stream to read from
+ * @param md
+ * message digest to check the response against
+ * @param blobType
+ * whether the BLOB should be permanent or transient
+ *
+ * @throws IOException
+ * if the response is an error, the message digest does not match or reading the response
+ * failed
+ */
+ private static BlobKey receiveAndCheckPutResponse(
+ InputStream is, MessageDigest md, BlobKey.BlobType blobType)
+ throws IOException {
+ int response = is.read();
+ if (response < 0) {
+ throw new EOFException("Premature end of response");
+ }
+ else if (response == RETURN_OKAY) {
+
+ BlobKey remoteKey = BlobKey.readFromInputStream(is);
+ byte[] localHash = md.digest();
+
+ if (blobType != remoteKey.getType()) {
+ throw new IOException("Detected data corruption during transfer");
+ }
+ if (!Arrays.equals(localHash, remoteKey.getHash())) {
+ throw new IOException("Detected data corruption during transfer");
+ }
+
+ return remoteKey;
+ }
+ else if (response == RETURN_ERROR) {
+ Throwable cause = BlobUtils.readExceptionFromStream(is);
+ throw new IOException("Server side error: " + cause.getMessage(), cause);
+ }
+ else {
+ throw new IOException("Unrecognized response: " + response + '.');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index a21c7d6..a61d679 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
@@ -353,6 +354,28 @@ public class BlobUtils {
}
/**
+ * Reads exception from given {@link InputStream}.
+ *
+ * @param in the input stream to read from
+ * @return exception that was read
+ * @throws IOException thrown if an I/O error occurs while reading from the input
+ * stream
+ */
+ static Throwable readExceptionFromStream(InputStream in) throws IOException {
+ int len = readLength(in);
+ byte[] bytes = new byte[len];
+ readFully(in, bytes, 0, len, "Error message");
+
+ try {
+ return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+ }
+ catch (ClassNotFoundException e) {
+ // should never occur
+ throw new IOException("Could not transfer error message", e);
+ }
+ }
+
+ /**
* Auxiliary method to read a particular number of bytes from an input stream. This method blocks until the
* requested number of bytes have been read from the stream. If the stream cannot offer enough data, an
* {@link EOFException} is thrown.
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 4b84857..76d9bd6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -429,6 +429,13 @@ public class JobClient {
"Could not upload the program's JAR files to the JobManager.", e);
}
+ try {
+ jobGraph.uploadUserArtifacts(blobServerAddress, config);
+ } catch (IOException e) {
+ throw new JobSubmissionException(jobGraph.getJobID(),
+ "Could not upload custom user artifacts to the job manager.", e);
+ }
+
CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index e9824ae..9b95633 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -162,6 +162,16 @@ public class JobSubmissionClientActor extends JobClientActor {
"Could not upload the jar files to the job manager.",
e));
}
+
+ try {
+ jobGraph.uploadUserArtifacts(blobServerAddress, clientConfig);
+ } catch (IOException e) {
+ throw new CompletionException(
+ new JobSubmissionException(
+ jobGraph.getJobID(),
+ "Could not upload custom user artifacts to the job manager.",
+ e));
+ }
},
getContext().dispatcher());
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index d9d021a..9021751 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,27 +18,33 @@
package org.apache.flink.runtime.filecache;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@@ -46,6 +52,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The FileCache is used to create the local files for the registered cache files when a task is deployed.
@@ -54,12 +64,14 @@ import java.util.concurrent.TimeUnit;
*/
public class FileCache {
- static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
/** cache-wide lock to ensure consistency. copies are not done under this lock. */
private final Object lock = new Object();
- private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
+ private final Map<JobID, Map<String, Future<Path>>> entries;
+
+ private final Map<JobID, Set<ExecutionAttemptID>> jobRefHolders;
private final ScheduledExecutorService executorService;
@@ -69,11 +81,23 @@ public class FileCache {
private int nextDirectory;
+ private final PermanentBlobService blobService;
+
+ private final long cleanupInterval; //in milliseconds
+
// ------------------------------------------------------------------------
- public FileCache(String[] tempDirectories) throws IOException {
+ public FileCache(String[] tempDirectories, PermanentBlobService blobService) throws IOException {
+ this (tempDirectories, blobService, Executors.newScheduledThreadPool(10,
+ new ExecutorThreadFactory("flink-file-cache")), 5000);
+ }
+
+ @VisibleForTesting
+ FileCache(String[] tempDirectories, PermanentBlobService blobService,
+ ScheduledExecutorService executorService, long cleanupInterval) throws IOException {
Preconditions.checkNotNull(tempDirectories);
+ this.cleanupInterval = cleanupInterval;
storageDirectories = new File[tempDirectories.length];
@@ -99,9 +123,10 @@ public class FileCache {
this.shutdownHook = createShutdownHook(this, LOG);
- this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
- this.executorService = Executors.newScheduledThreadPool(10,
- new ExecutorThreadFactory("flink-file-cache"));
+ this.entries = new HashMap<>();
+ this.jobRefHolders = new HashMap<>();
+ this.executorService = executorService;
+ this.blobService = blobService;
}
/**
@@ -114,7 +139,7 @@ public class FileCache {
if (es != null) {
es.shutdown();
try {
- es.awaitTermination(5000L, TimeUnit.MILLISECONDS);
+ es.awaitTermination(cleanupInterval, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
// may happen
@@ -122,6 +147,7 @@ public class FileCache {
}
entries.clear();
+ jobRefHolders.clear();
// clean up the all storage directories
for (File dir : storageDirectories) {
@@ -143,30 +169,24 @@ public class FileCache {
/**
* If the file doesn't exists locally, it will copy the file to the temp directory.
*
- * @param name The name under which the file is registered.
* @param entry The cache entry descriptor (path, executable flag)
* @param jobID The ID of the job for which the file is copied.
* @return The handle to the task that copies the file.
*/
- public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
+ public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) throws Exception {
synchronized (lock) {
- Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
- if (jobEntries == null) {
- jobEntries = new HashMap<String, Tuple4<Integer, File, Path, Future<Path>>>();
- entries.put(jobID, jobEntries);
- }
+ Map<String, Future<Path>> jobEntries = entries.computeIfAbsent(jobID, k -> new HashMap<>());
+
+ // register reference holder
+ final Set<ExecutionAttemptID> refHolders = jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
+ refHolders.add(executionId);
- // tuple is (ref-count, parent-temp-dir, cached-file-path, copy-process)
- Tuple4<Integer, File, Path, Future<Path>> fileEntry = jobEntries.get(name);
+ Future<Path> fileEntry = jobEntries.get(name);
if (fileEntry != null) {
// file is already in the cache. return a future that
// immediately returns the file
- fileEntry.f0 = fileEntry.f0 + 1;
-
- // return the future. may be that the copy is still in progress
- return fileEntry.f3;
- }
- else {
+ return fileEntry;
+ } else {
// need to copy the file
// create the target path
@@ -184,75 +204,18 @@ public class FileCache {
Path target = new Path(tempDirToUse.getAbsolutePath() + "/" + sourceFile);
// kick off the copying
- CopyProcess cp = new CopyProcess(entry, target);
- FutureTask<Path> copyTask = new FutureTask<Path>(cp);
+ Callable<Path> cp = new CopyFromBlobProcess(entry, jobID, blobService, target);
+ FutureTask<Path> copyTask = new FutureTask<>(cp);
executorService.submit(copyTask);
// store our entry
- jobEntries.put(name, new Tuple4<Integer, File, Path, Future<Path>>(1, tempDirToUse, target, copyTask));
+ jobEntries.put(name, copyTask);
return copyTask;
}
}
}
- /**
- * Deletes the local file after a 5 second delay.
- *
- * @param name The name under which the file is registered.
- * @param jobID The ID of the job for which the file is copied.
- */
- public void deleteTmpFile(String name, JobID jobID) {
- DeleteProcess dp = new DeleteProcess(lock, entries, name, jobID);
- executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
- }
-
- boolean holdsStillReference(String name, JobID jobId) {
- Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobId);
- if (jobEntries != null) {
- Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
- return entry != null && entry.f0 > 0;
- }
- else {
- return false;
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
- // TODO rewrite this to make it participate in the closable registry and the lifecycle of a task.
- // we unwrap the file system to get raw streams without safety net
- FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri());
- FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri());
- if (!tFS.exists(targetPath)) {
- if (sFS.getFileStatus(sourcePath).isDir()) {
- tFS.mkdirs(targetPath);
- FileStatus[] contents = sFS.listStatus(sourcePath);
- for (FileStatus content : contents) {
- String distPath = content.getPath().toString();
- if (content.isDir()) {
- if (distPath.endsWith("/")) {
- distPath = distPath.substring(0, distPath.length() - 1);
- }
- }
- String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
- copy(content.getPath(), new Path(localPath), executable);
- }
- } else {
- try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) {
- IOUtils.copyBytes(fsInput, lfsOutput);
- //noinspection ResultOfMethodCallIgnored
- new File(targetPath.toString()).setExecutable(executable);
- } catch (IOException ioe) {
- LOG.error("could not copy file to local file cache.", ioe);
- }
- }
- }
- }
-
private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
return ShutdownHookUtil.addShutdownHook(
@@ -262,50 +225,92 @@ public class FileCache {
);
}
+ public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
+ checkNotNull(jobId);
+
+ synchronized (lock) {
+ Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
+
+ if (jobRefCounter == null || jobRefCounter.isEmpty()) {
+ LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
+ return;
+ }
+
+ jobRefCounter.remove(executionId);
+ if (jobRefCounter.isEmpty()) {
+ executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// background processes
// ------------------------------------------------------------------------
/**
- * Asynchronous file copy process.
+ * Asynchronous file copy process from blob server.
*/
- private static class CopyProcess implements Callable<Path> {
-
- private final Path filePath;
- private final Path cachedPath;
- private boolean executable;
+ private static class CopyFromBlobProcess implements Callable<Path> {
- public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
- this.filePath = new Path(e.filePath);
- this.executable = e.isExecutable;
- this.cachedPath = cachedPath;
+ private final PermanentBlobKey blobKey;
+ private final Path target;
+ private final boolean isDirectory;
+ private final boolean isExecutable;
+ private final JobID jobID;
+ private final PermanentBlobService blobService;
+
+ CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) throws Exception {
+ this.isExecutable = e.isExecutable;
+ this.isDirectory = e.isZipped;
+ this.jobID = jobID;
+ this.blobService = blobService;
+ this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader());
+ this.target = target;
}
@Override
public Path call() throws IOException {
- // let exceptions propagate. we can retrieve them later from
- // the future and report them upon access to the result
- copy(filePath, cachedPath, this.executable);
- return cachedPath;
+ final File file = blobService.getFile(jobID, blobKey);
+
+ if (isDirectory) {
+ try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) {
+ ZipEntry entry;
+ while ((entry = zis.getNextEntry()) != null) {
+ String fileName = entry.getName();
+ Path newFile = new Path(target, fileName);
+ if (entry.isDirectory()) {
+ target.getFileSystem().mkdirs(newFile);
+ } else {
+ try (FSDataOutputStream fsDataOutputStream = target.getFileSystem()
+ .create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
+ IOUtils.copyBytes(zis, fsDataOutputStream, false);
+ }
+ //noinspection ResultOfMethodCallIgnored
+ new File(newFile.getPath()).setExecutable(isExecutable);
+ }
+ zis.closeEntry();
+ }
+ }
+ Files.delete(file.toPath());
+ return target;
+ } else {
+ //noinspection ResultOfMethodCallIgnored
+ file.setExecutable(isExecutable);
+ return Path.fromLocalFile(file);
+ }
+
}
}
/**
* If no task is using this file after 5 seconds, clear it.
*/
- private static class DeleteProcess implements Runnable {
-
- private final Object lock;
- private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
+ @VisibleForTesting
+ class DeleteProcess implements Runnable {
- private final String name;
private final JobID jobID;
- public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
- String name, JobID jobID) {
- this.lock = lock;
- this.entries = entries;
- this.name = name;
+ DeleteProcess(JobID jobID) {
this.jobID = jobID;
}
@@ -313,53 +318,26 @@ public class FileCache {
public void run() {
try {
synchronized (lock) {
- Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
- if (jobEntries != null) {
- Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
+ Set<ExecutionAttemptID> jobRefs = jobRefHolders.get(jobID);
+ if (jobRefs != null && jobRefs.isEmpty()) {
+ // abort the copy
+ for (Future<Path> fileFuture : entries.get(jobID).values()) {
+ fileFuture.cancel(true);
+ }
- if (entry != null) {
- int count = entry.f0;
- if (count > 1) {
- // multiple references still
- entry.f0 = count - 1;
- }
- else {
- // we remove the last reference
- jobEntries.remove(name);
- if (jobEntries.isEmpty()) {
- entries.remove(jobID);
- }
-
- // abort the copy
- entry.f3.cancel(true);
-
- // remove the file
- File file = new File(entry.f2.toString());
- if (file.exists()) {
- if (file.isDirectory()) {
- FileUtils.deleteDirectory(file);
- }
- else if (!file.delete()) {
- LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
- }
- }
-
- // remove the job wide temp directory, if it is now empty
- File parent = entry.f1;
- if (parent.isDirectory()) {
- String[] children = parent.list();
- if (children == null || children.length == 0) {
- //noinspection ResultOfMethodCallIgnored
- parent.delete();
- }
- }
- }
+ //remove job specific entries in maps
+ entries.remove(jobID);
+ jobRefHolders.remove(jobID);
+
+ // remove the job wide temp directories
+ for (File storageDirectory : storageDirectories) {
+ File tempDir = new File(storageDirectory, jobID.toString());
+ FileUtils.deleteDirectory(tempDir);
}
}
}
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.error("Could not delete file from local file cache.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 889f8ae..bc1a795 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.jobgraph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
@@ -34,6 +36,7 @@ import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -98,6 +101,9 @@ public class JobGraph implements Serializable {
/** Set of JAR files required to run this job. */
private final List<Path> userJars = new ArrayList<Path>();
+ /** Set of custom files required to run this job. */
+ private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
+
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
@@ -267,7 +273,7 @@ public class JobGraph implements Serializable {
* Sets the execution config. This method eagerly serialized the ExecutionConfig for future RPC
* transport. Further modification of the referenced ExecutionConfig object will not affect
* this serialized copy.
- *
+ *
* @param executionConfig The ExecutionConfig to be serialized.
* @throws IOException Thrown if the serialization of the ExecutionConfig fails
*/
@@ -489,12 +495,35 @@ public class JobGraph implements Serializable {
}
/**
+ * Adds the path of a custom file required to run the job on a task manager.
+ *
+ * @param name a name under which this artifact will be accessible through {@link DistributedCache}
+ * @param file path of a custom file required to run the job on a task manager
+ */
+ public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) {
+ if (file == null) {
+ throw new IllegalArgumentException();
+ }
+
+ userArtifacts.putIfAbsent(name, file);
+ }
+
+ /**
+ * Gets the list of assigned user jar paths.
+ *
+ * @return The list of assigned user jar paths
+ */
+ public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() {
+ return userArtifacts;
+ }
+
+ /**
* Adds the BLOB referenced by the key to the JobGraph's dependencies.
*
* @param key
* path of the JAR file required to run the job on a task manager
*/
- public void addBlob(PermanentBlobKey key) {
+ public void addUserJarBlobKey(PermanentBlobKey key) {
if (key == null) {
throw new IllegalArgumentException();
}
@@ -535,7 +564,7 @@ public class JobGraph implements Serializable {
InetSocketAddress blobServerAddress,
Configuration blobClientConfig) throws IOException {
if (!userJars.isEmpty()) {
- List<PermanentBlobKey> blobKeys = BlobClient.uploadJarFiles(
+ List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles(
blobServerAddress, blobClientConfig, jobID, userJars);
for (PermanentBlobKey blobKey : blobKeys) {
@@ -550,4 +579,24 @@ public class JobGraph implements Serializable {
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
+
+ public void uploadUserArtifacts(InetSocketAddress blobServerAddress, Configuration clientConfig) throws IOException {
+ if (!userArtifacts.isEmpty()) {
+ try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) {
+ for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
+
+ final PermanentBlobKey key = blobClient.uploadFile(jobID,
+ new Path(userArtifact.getValue().filePath));
+
+ DistributedCache.writeFileInfoToConfig(
+ userArtifact.getKey(),
+ new DistributedCache.DistributedCacheEntry(
+ userArtifact.getValue().filePath,
+ userArtifact.getValue().isExecutable,
+ InstantiationUtil.serializeObject(key)),
+ jobConfiguration);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e0146f8a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 64d46c6..380ac0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -579,6 +579,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
+ uploadUserArtifacts(job);
+
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
try {
@@ -602,6 +604,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, "job is null");
+ uploadUserArtifacts(job);
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
@@ -624,6 +627,15 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
}
}
+ private void uploadUserArtifacts(JobGraph job) throws JobExecutionException {
+ try {
+ final InetSocketAddress blobAddress = new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort());
+ job.uploadUserArtifacts(blobAddress, miniClusterConfiguration.getConfiguration());
+ } catch (IOException e) {
+ throw new JobExecutionException(job.getJobID(), "Could not upload user artifacts", e);
+ }
+ }
+
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final DispatcherGateway dispatcherGateway;
try {
@@ -675,7 +687,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars());
return jarUploadFuture.thenAccept(blobKeys -> {
for (PermanentBlobKey blobKey : blobKeys) {
- job.addBlob(blobKey);
+ job.addUserJarBlobKey(blobKey);
}
});
} else {
@@ -690,7 +702,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
try {
- return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
+ return BlobClient.uploadFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars);
} catch (IOException ioe) {
throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe));
}