You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/27 09:45:19 UTC

flink git commit: [FLINK-5666] [tests] Add blob server clean up tests

Repository: flink
Updated Branches:
  refs/heads/release-1.2 07865aaf8 -> b1ab75f48


[FLINK-5666] [tests] Add blob server clean up tests

Previously, deleting in HA mode was only tested with a local file system.
This verifies that the delete still works on HDFS.

This closes #3222.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1ab75f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1ab75f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1ab75f4

Branch: refs/heads/release-1.2
Commit: b1ab75f482a8fa07e2a7f2a2ee31c0d808038cb0
Parents: 07865aa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 26 20:29:58 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 27 10:45:10 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    | 19 ++++++++++
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 40 ++++++++++++++++----
 .../BlobLibraryCacheRecoveryITCase.java         |  8 ++--
 3 files changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 1df6390..49db0f8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,10 +24,14 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.blob.BlobRecoveryITCase;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -217,6 +221,21 @@ public class HDFSTest {
 		assertFalse(fs.exists(directory));
 	}
 
+	/**
+	 * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
+	 * participating BlobServer.
+	 */
+	@Test
+	public void testBlobServerRecovery() throws Exception {
+		org.apache.flink.configuration.Configuration
+			config = new org.apache.flink.configuration.Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
+
+		BlobRecoveryITCase.testBlobServerRecovery(config);
+	}
+
 	// package visible
 	static abstract class DopOneTestEnvironment extends ExecutionEnvironment {
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 3fe207e..a8eb1d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -23,18 +23,24 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class BlobRecoveryITCase {
 
@@ -61,6 +67,16 @@ public class BlobRecoveryITCase {
 	 */
 	@Test
 	public void testBlobServerRecovery() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
+
+		testBlobServerRecovery(config);
+	}
+
+	public static void testBlobServerRecovery(final Configuration config) throws IOException {
+		String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
 		Random rand = new Random();
 
 		BlobServer[] server = new BlobServer[2];
@@ -68,10 +84,6 @@ public class BlobRecoveryITCase {
 		BlobClient client = null;
 
 		try {
-			Configuration config = new Configuration();
-			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);
@@ -96,6 +108,11 @@ public class BlobRecoveryITCase {
 			client.put(jobId[0], testKey[0], expected); // Request 3
 			client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4
 
+			// check that the storage directory exists
+			final Path blobServerPath = new Path(storagePath, "blob");
+			FileSystem fs = blobServerPath.getFileSystem();
+			assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
+
 			// Close the client and connect to the other server
 			client.close();
 			client = new BlobClient(serverAddress[1], config);
@@ -146,6 +163,17 @@ public class BlobRecoveryITCase {
 			client.delete(keys[1]);
 			client.delete(jobId[0], testKey[0]);
 			client.delete(jobId[1], testKey[1]);
+
+			// Verify everything is clean
+			if (fs.exists(blobServerPath)) {
+				final org.apache.flink.core.fs.FileStatus[] recoveryFiles =
+					fs.listStatus(blobServerPath);
+				ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
+				for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) {
+					filenames.add(file.toString());
+				}
+				fail("Unclean state backend: " + filenames);
+			}
 		}
 		finally {
 			for (BlobServer s : server) {
@@ -158,9 +186,5 @@ public class BlobRecoveryITCase {
 				client.close();
 			}
 		}
-
-		// Verify everything is clean
-		File[] recoveryFiles = recoveryDir.listFiles();
-		assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 8fabdf6..a727d51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -143,6 +143,10 @@ public class BlobLibraryCacheRecoveryITCase {
 				client.delete(keys.get(0));
 				client.delete(keys.get(1));
 			}
+
+			// Verify everything is clean
+			File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
+			assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
 		}
 		finally {
 			for (BlobServer s : server) {
@@ -159,9 +163,5 @@ public class BlobLibraryCacheRecoveryITCase {
 				libCache.shutdown();
 			}
 		}
-
-		// Verify everything is clean
-		File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
-		assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
 	}
 }