You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/02 14:34:00 UTC

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

    [ https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461101#comment-16461101 ] 

ASF GitHub Bot commented on FLINK-8620:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r185517079
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDirectoriesTest.java ---
    @@ -0,0 +1,209 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.filecache;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.cache.DistributedCache;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.blob.PermanentBlobKey;
    +import org.apache.flink.runtime.blob.PermanentBlobService;
    +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
    +import org.apache.flink.util.FileUtils;
    +import org.apache.flink.util.IOUtils;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.concurrent.Future;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +import java.util.zip.ZipEntry;
    +import java.util.zip.ZipOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests that {@link FileCache} can read zipped directories from BlobServer and properly cleans them after.
    + */
    +public class FileCacheDirectoriesTest {
    +
    +	private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" + "Prolog im Himmel.\n"
    +	+ "Der Herr. Die himmlischen Heerscharen. Nachher Mephistopheles. Die drei\n" + "Erzengel treten vor.\n"
    +	+ "RAPHAEL: Die Sonne toent, nach alter Weise, In Brudersphaeren Wettgesang,\n"
    +	+ "Und ihre vorgeschriebne Reise Vollendet sie mit Donnergang. Ihr Anblick\n"
    +	+ "gibt den Engeln Staerke, Wenn keiner Sie ergruenden mag; die unbegreiflich\n"
    +	+ "hohen Werke Sind herrlich wie am ersten Tag.\n"
    +	+ "GABRIEL: Und schnell und unbegreiflich schnelle Dreht sich umher der Erde\n"
    +	+ "Pracht; Es wechselt Paradieseshelle Mit tiefer, schauervoller Nacht. Es\n"
    +	+ "schaeumt das Meer in breiten Fluessen Am tiefen Grund der Felsen auf, Und\n"
    +	+ "Fels und Meer wird fortgerissen Im ewig schnellem Sphaerenlauf.\n"
    +	+ "MICHAEL: Und Stuerme brausen um die Wette Vom Meer aufs Land, vom Land\n"
    +	+ "aufs Meer, und bilden wuetend eine Kette Der tiefsten Wirkung rings umher.\n"
    +	+ "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n"
    +	+ "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags.";
    +
    +	@Rule
    +	public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private FileCache fileCache;
    +
    +	private final PermanentBlobKey permanentBlobKey = new PermanentBlobKey();
    +
    +	private final PermanentBlobService blobService = new PermanentBlobService() {
    +		@Override
    +		public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
    +			if (key.equals(permanentBlobKey)) {
    +				final File zipArchive = temporaryFolder.newFile("zipArchive");
    +				try (ZipOutputStream zis = new ZipOutputStream(new FileOutputStream(zipArchive))) {
    +
    +					final ZipEntry zipEntry = new ZipEntry("cacheFile");
    +					zis.putNextEntry(zipEntry);
    +
    +					IOUtils.copyBytes(new ByteArrayInputStream(testFileContent.getBytes(StandardCharsets.UTF_8)), zis, false);
    +				}
    +				return zipArchive;
    +			} else {
    +				throw new IllegalArgumentException("This service contains only entry for " + permanentBlobKey);
    +			}
    +		}
    +
    +		@Override
    +		public void close() throws IOException {
    +
    +		}
    +	};
    +
    +	@Before
    +	public void setup() throws Exception {
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService);
    +	}
    +
    +	@After
    +	public void shutdown() {
    +		fileCache.shutdown();
    +	}
    +
    +	@Test
    +	public void testDirectoryDownloadedFromBlob() throws Exception {
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		assertTrue(fileStatus.isDir());
    +
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fs.exists(cacheFile));
    +		final String actualContent = FileUtils.readFileUtf8(new File(cacheFile.getPath()));
    +		assertEquals(testFileContent, actualContent);
    +	}
    +
    +	@Test
    +	public void testDirectoryCleanUp() throws Exception {
    +		fileCache.shutdown();
    +		DeleteCapturingDirectScheduledExecutorService scheduledExecutorService =
    +			new DeleteCapturingDirectScheduledExecutorService();
    +
    +		final int cleanupInterval = 1000;
    +		fileCache = new FileCache(new String[]{temporaryFolder.newFolder().getAbsolutePath()}, blobService,
    +			scheduledExecutorService, cleanupInterval);
    +
    +		JobID jobID = new JobID();
    +		ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
    +		ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
    +
    +		final String fileName = "test_file";
    +		// copy / create the file
    +		final DistributedCache.DistributedCacheEntry entry = new DistributedCache.DistributedCacheEntry(
    +			fileName,
    +			false,
    +			InstantiationUtil.serializeObject(permanentBlobKey),
    +			true);
    +		Future<Path> copyResult = fileCache.createTmpFile(fileName, entry, jobID, attemptID1);
    +		fileCache.createTmpFile(fileName, entry, jobID, attemptID2);
    +
    +		final Path dstPath = copyResult.get();
    +		final FileSystem fs = dstPath.getFileSystem();
    +		final FileStatus fileStatus = fs.getFileStatus(dstPath);
    +		final Path cacheFile = new Path(dstPath, "cacheFile");
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID1);
    +		// still should be available
    +		assertTrue(fileStatus.isDir());
    +		assertTrue(fs.exists(cacheFile));
    +
    +		fileCache.releaseJob(jobID, attemptID2);
    --- End diff --
    
    We are not, cause the `DirectExecutorService` is synchronous for all non-scheduled tasks. The only scheduled task is the `DeleteProcess` though, which we capture and run manually.


> Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-8620
>                 URL: https://issues.apache.org/jira/browse/FLINK-8620
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we can store those files in BlobStore and later on access them in TaskManagers through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)