You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/01/27 09:45:19 UTC
flink git commit: [FLINK-5666] [tests] Add blob server clean up tests
Repository: flink
Updated Branches:
refs/heads/release-1.2 07865aaf8 -> b1ab75f48
[FLINK-5666] [tests] Add blob server clean up tests
Previously, deleting in HA mode was only tested with a local file system.
This verifies that the delete still works on HDFS.
This closes #3222.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1ab75f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1ab75f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1ab75f4
Branch: refs/heads/release-1.2
Commit: b1ab75f482a8fa07e2a7f2a2ee31c0d808038cb0
Parents: 07865aa
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 26 20:29:58 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Fri Jan 27 10:45:10 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/hdfstests/HDFSTest.java | 19 ++++++++++
.../flink/runtime/blob/BlobRecoveryITCase.java | 40 ++++++++++++++++----
.../BlobLibraryCacheRecoveryITCase.java | 8 ++--
3 files changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 1df6390..49db0f8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -24,10 +24,14 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.blob.BlobRecoveryITCase;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -217,6 +221,21 @@ public class HDFSTest {
assertFalse(fs.exists(directory));
}
+ /**
+ * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
+ * participating BlobServer.
+ */
+ @Test
+ public void testBlobServerRecovery() throws Exception {
+ org.apache.flink.configuration.Configuration
+ config = new org.apache.flink.configuration.Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);
+
+ BlobRecoveryITCase.testBlobServerRecovery(config);
+ }
+
// package visible
static abstract class DopOneTestEnvironment extends ExecutionEnvironment {
http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 3fe207e..a8eb1d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -23,18 +23,24 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class BlobRecoveryITCase {
@@ -61,6 +67,16 @@ public class BlobRecoveryITCase {
*/
@Test
public void testBlobServerRecovery() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
+
+ testBlobServerRecovery(config);
+ }
+
+ public static void testBlobServerRecovery(final Configuration config) throws IOException {
+ String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
Random rand = new Random();
BlobServer[] server = new BlobServer[2];
@@ -68,10 +84,6 @@ public class BlobRecoveryITCase {
BlobClient client = null;
try {
- Configuration config = new Configuration();
- config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());
for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
@@ -96,6 +108,11 @@ public class BlobRecoveryITCase {
client.put(jobId[0], testKey[0], expected); // Request 3
client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4
+ // check that the storage directory exists
+ final Path blobServerPath = new Path(storagePath, "blob");
+ FileSystem fs = blobServerPath.getFileSystem();
+ assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));
+
// Close the client and connect to the other server
client.close();
client = new BlobClient(serverAddress[1], config);
@@ -146,6 +163,17 @@ public class BlobRecoveryITCase {
client.delete(keys[1]);
client.delete(jobId[0], testKey[0]);
client.delete(jobId[1], testKey[1]);
+
+ // Verify everything is clean
+ if (fs.exists(blobServerPath)) {
+ final org.apache.flink.core.fs.FileStatus[] recoveryFiles =
+ fs.listStatus(blobServerPath);
+ ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
+ for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) {
+ filenames.add(file.toString());
+ }
+ fail("Unclean state backend: " + filenames);
+ }
}
finally {
for (BlobServer s : server) {
@@ -158,9 +186,5 @@ public class BlobRecoveryITCase {
client.close();
}
}
-
- // Verify everything is clean
- File[] recoveryFiles = recoveryDir.listFiles();
- assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b1ab75f4/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 8fabdf6..a727d51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -143,6 +143,10 @@ public class BlobLibraryCacheRecoveryITCase {
client.delete(keys.get(0));
client.delete(keys.get(1));
}
+
+ // Verify everything is clean
+ File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
+ assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
finally {
for (BlobServer s : server) {
@@ -159,9 +163,5 @@ public class BlobLibraryCacheRecoveryITCase {
libCache.shutdown();
}
}
-
- // Verify everything is clean
- File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
- assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
}