You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/08/04 13:25:58 UTC

[flink] branch master updated (ff5351b -> 27ecf02)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ff5351b  [FLINK-18793] fix code link typo
     new e3e9d69  [FLINK-17253][fs-connector] Support viewfs for hadoop version < 2.7
     new 27ecf02  [FLINK-17569][fs-connector] Delegate lease revoking to correct from viewfs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  21 ++-
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |   2 +-
 ...leWriterOldHadoopWithNoTruncateSupportTest.java |  10 ++
 .../fs/hdfs/HadoopViewFileSystemTruncateTest.java  | 172 +++++++++++++++++++++
 4 files changed, 202 insertions(+), 3 deletions(-)
 create mode 100644 flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java


[flink] 01/02: [FLINK-17253][fs-connector] Support viewfs for hadoop version < 2.7

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3e9d69eec2a86b45914ded0e76ae1ed53b427c0
Author: spurthi chaganti <c....@criteo.com>
AuthorDate: Mon Apr 20 00:12:26 2020 -0400

    [FLINK-17253][fs-connector] Support viewfs for hadoop version < 2.7
    
    This closes #11815.
---
 .../apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java  |  2 +-
 ...oopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java | 10 ++++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index 91d76c6..ed233c5 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -57,7 +57,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
 		this.fs = checkNotNull(fs);
 
 		// This writer is only supported on a subset of file systems
-		if (!"hdfs".equalsIgnoreCase(fs.getScheme())) {
+		if (!("hdfs".equalsIgnoreCase(fs.getScheme()) || "viewfs".equalsIgnoreCase(fs.getScheme()))) {
 			throw new UnsupportedOperationException(
 					"Recoverable writers on Hadoop are only supported for HDFS");
 		}
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
index b12090f..03d978a 100644
--- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
@@ -34,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -44,6 +45,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertNotNull;
 import static junit.framework.TestCase.assertTrue;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link HadoopRecoverableWriter} with Hadoop versions pre Hadoop 2.7.
@@ -141,6 +143,14 @@ public class HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest {
 		}
 	}
 
+	@Test
+	public void testRecoverableWriterWithViewfsScheme() {
+		final org.apache.hadoop.fs.FileSystem mockViewfs = Mockito.mock(org.apache.hadoop.fs.FileSystem.class);
+		when(mockViewfs.getScheme()).thenReturn("viewfs");
+		// Creating the writer should not throw UnsupportedOperationException.
+		RecoverableWriter recoverableWriter = new HadoopRecoverableWriter(mockViewfs);
+	}
+
 	private RecoverableFsDataOutputStream getOpenStreamToFileWithContent(
 			final RecoverableWriter writerUnderTest,
 			final Path path,


[flink] 02/02: [FLINK-17569][fs-connector] Delegate lease revoking to correct from viewfs

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 27ecf02fb2a8e61b68e93190080f540e53aca171
Author: wxp <wx...@outlook.com>
AuthorDate: Fri May 8 17:50:37 2020 +0800

    [FLINK-17569][fs-connector] Delegate lease revoking to correct from viewfs
    
    This closes #12035.
---
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  21 ++-
 .../fs/hdfs/HadoopViewFileSystemTruncateTest.java  | 172 +++++++++++++++++++++
 2 files changed, 191 insertions(+), 2 deletions(-)

diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index adf123f..4ebd41d 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -157,7 +158,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 
 		ensureTruncateInitialized();
 
-		waitUntilLeaseIsRevoked(fileSystem, path);
+		revokeLeaseByFileSystem(fileSystem, path);
 
 		// truncate back and append
 		boolean truncated;
@@ -170,7 +171,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 		if (!truncated) {
 			// Truncate did not complete immediately, we must wait for
 			// the operation to complete and release the lease.
-			waitUntilLeaseIsRevoked(fileSystem, path);
+			revokeLeaseByFileSystem(fileSystem, path);
 		}
 	}
 
@@ -314,6 +315,22 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream
 	}
 
 	/**
+	 * Resolve the real path of FileSystem if it is {@link ViewFileSystem} and
+	 * revoke the lease of the file we are resuming with different FileSystem.
+	 *
+	 * @param path The path to the file we want to resume writing to.
+	 */
+	private static boolean revokeLeaseByFileSystem(final FileSystem fs, final Path path) throws IOException {
+		if (fs instanceof ViewFileSystem) {
+			final ViewFileSystem vfs = (ViewFileSystem) fs;
+			final Path resolvePath = vfs.resolvePath(path);
+			final FileSystem resolveFs = resolvePath.getFileSystem(fs.getConf());
+			return waitUntilLeaseIsRevoked(resolveFs, resolvePath);
+		}
+		return waitUntilLeaseIsRevoked(fs, path);
+	}
+
+	/**
 	 * Called when resuming execution after a failure and waits until the lease
 	 * of the file we are resuming is free.
 	 *
diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java
new file mode 100644
index 0000000..7daf489
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.fs.viewfs.ViewFileSystemTestSetup;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link org.apache.hadoop.fs.viewfs.ViewFileSystem} support.
+ */
+public class HadoopViewFileSystemTruncateTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	private final FileSystemTestHelper fileSystemTestHelper = new FileSystemTestHelper("/tests");
+
+	private static MiniDFSCluster hdfsCluster;
+	private static FileSystem fHdfs;
+	private static org.apache.flink.core.fs.FileSystem fSystem;
+
+	private Configuration fsViewConf;
+	private FileSystem fsTarget;
+	private Path targetTestRoot;
+
+	@BeforeClass
+	public static void testHadoopVersion() {
+		Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+	}
+
+	@BeforeClass
+	public static void verifyOS() {
+		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
+	}
+
+	@BeforeClass
+	public static void createHDFS() throws Exception {
+		final File baseDir = TEMP_FOLDER.newFolder();
+
+		final Configuration hdConf = new Configuration();
+		hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+
+		final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf)
+				.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1));
+		hdfsCluster = builder.build();
+		hdfsCluster.waitClusterUp();
+
+		fHdfs = hdfsCluster.getFileSystem(0);
+	}
+
+	@Before
+	public void setUp() throws Exception {
+		fsTarget = fHdfs;
+		targetTestRoot = fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
+
+		fsTarget.delete(targetTestRoot, true);
+		fsTarget.mkdirs(targetTestRoot);
+
+		fsViewConf = ViewFileSystemTestSetup.createConfig();
+		setupMountPoints();
+		FileSystem fsView = FileSystem.get(FsConstants.VIEWFS_URI, fsViewConf);
+		fSystem = new HadoopFileSystem(fsView);
+	}
+
+	private void setupMountPoints() {
+		Path mountOnNn1 = new Path("/mountOnNn1");
+		ConfigUtil.addLink(fsViewConf, mountOnNn1.toString(), targetTestRoot.toUri());
+	}
+
+	@AfterClass
+	public static void shutdownCluster() {
+		hdfsCluster.shutdown();
+	}
+
+	@After
+	public void tearDown() throws Exception {
+		fsTarget.delete(fileSystemTestHelper.getTestRootPath(fsTarget), true);
+	}
+
+	@Test
+	public void testViewFileSystemRecoverWorks() throws IOException {
+
+		final org.apache.flink.core.fs.Path testPath = new org.apache.flink.core.fs.Path(
+				fSystem.getUri() + "mountOnNn1/test-1");
+		final String expectedContent = "test_line";
+
+		final RecoverableWriter writer = fSystem.createRecoverableWriter();
+		final RecoverableFsDataOutputStream streamUnderTest = getOpenStreamToFileWithContent(
+				writer, testPath, expectedContent);
+
+		final ResumeRecoverable resumeRecover = streamUnderTest.persist();
+
+		final RecoverableFsDataOutputStream recover = writer.recover(resumeRecover);
+
+		final RecoverableWriter.CommitRecoverable committable = recover.closeForCommit().getRecoverable();
+
+		final RecoverableWriter recoveredWriter = fSystem.createRecoverableWriter();
+		recoveredWriter.recoverForCommit(committable).commitAfterRecovery();
+
+		verifyFileContent(testPath, expectedContent);
+	}
+
+	private RecoverableFsDataOutputStream getOpenStreamToFileWithContent(
+			final RecoverableWriter writerUnderTest,
+			final org.apache.flink.core.fs.Path path,
+			final String expectedContent) throws IOException {
+		final byte[] content = expectedContent.getBytes(UTF_8);
+
+		final RecoverableFsDataOutputStream streamUnderTest = writerUnderTest.open(path);
+		streamUnderTest.write(content);
+		return streamUnderTest;
+	}
+
+	private static void verifyFileContent(
+			final org.apache.flink.core.fs.Path testPath,
+			final String expectedContent) throws IOException {
+		try (
+				FSDataInputStream in = fSystem.open(testPath);
+				InputStreamReader ir = new InputStreamReader(in, UTF_8);
+				BufferedReader reader = new BufferedReader(ir)
+		) {
+			final String line = reader.readLine();
+			assertEquals(expectedContent, line);
+		}
+	}
+}