You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/02 14:34:00 UTC
[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to
BlobStore and accessing them through DistributedCache
[ https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461101#comment-16461101 ]
ASF GitHub Bot commented on FLINK-8620:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r185517079
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.filecache;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.PermanentBlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
+ */
+public class FileCacheDirectoriesTest {
+
+ private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
+ + "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
+ + "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
+ + "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
+ + "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
+ + "hohen Werke Sind herrlich wie am ersten Tag.\n"
+ + "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
+ + "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
+ + "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
+ + "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
+ + "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
+ + "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
+ + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
+ + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
+
+ @Rule
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private FileCache fileCache;
+
+ private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
+
+ private final PermanentBlobService blobService = new PermanentBlobService() {
+ @Override
+ public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
+ if (key.equals(permanentBlobKey)) {
+ final File zipArchive = temporaryFolder.newFile("zipArchive");
+ try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
+
+ final ZipEntry zipEntry = new ZipEntry("cacheFile");
+ zis.putNextEntry(zipEntry);
+
+ IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
+ }
+ return zipArchive;
+ } else {
+ throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+
+ @Before
+ public void setup() throws Exception {
+ fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
+ }
+
+ @After
+ public void shutdown() {
+ fileCache.shutdown();
+ }
+
+ @Test
+ public void testDirectoryDownloadedFromBlob() throws Exception {
+ JobID jobID = new JobID();
+ ExecutionAttemptID attemptID = new ExecutionAttemptID();
+
+ final String fileName = "test_file";
+ // copy / create the file
+ final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
+ fileName,
+ false,
+ InstantiationUtil.serializeObject(permanentBlobKey),
+ true);
+ Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
+
+ final Path dstPath = copyResult.get();
+ final FileSystem fs = dstPath.getFileSystem();
+ final FileStatus fileStatus = fs.getFileStatus(dstPath);
+ assertTrue(fileStatus.isDir());
+
+ final Path cacheFile = new Path(dstPath, "cacheFile");
+ assertTrue(fs.exists(cacheFile));
+ final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
+ assertEquals(testFileContent, actualContent);
+ }
+
+ @Test
+ public void testDirectoryCleanUp() throws Exception {
+ fileCache.shutdown();
+ DeleteCapturingDirectScheduledExecutorService scheduledExecutorService =
+ new DeleteCapturingDirectScheduledExecutorService();
+
+ final int cleanupInterval = 1000;
+ fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService,
+ scheduledExecutorService, cleanupInterval);
+
+ JobID jobID = new JobID();
+ ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+ ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
+
+ final String fileName = "test_file";
+ // copy / create the file
+ final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
+ fileName,
+ false,
+ InstantiationUtil.serializeObject(permanentBlobKey),
+ true);
+ Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID1);
+ fileCache.createTmpFile(fileName, entry, jobID, attemptID2);
+
+ final Path dstPath = copyResult.get();
+ final FileSystem fs = dstPath.getFileSystem();
+ final FileStatus fileStatus = fs.getFileStatus(dstPath);
+ final Path cacheFile = new Path(dstPath, "cacheFile");
+ assertTrue(fileStatus.isDir());
+ assertTrue(fs.exists(cacheFile));
+
+ fileCache.releaseJob(jobID, attemptID1);
+ // still should be available
+ assertTrue(fileStatus.isDir());
+ assertTrue(fs.exists(cacheFile));
+
+ fileCache.releaseJob(jobID, attemptID2);
--- End diff --
We are not, cause the `DirectExecutorService` is synchronous for all non-scheduled tasks. The only scheduled task is the `DeleteProcess` though, which we capture and run manually.
> Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
> Issue Type: New Feature
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
> Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we can store those files in BlobStore and later on access them in TaskManagers through DistributedCache.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)