You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/17 06:18:01 UTC
[3/5] flink git commit: [FLINK-6519] Integrate BlobStore in lifecycle
management of HighAvailabilityServices
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 5054107..c106b3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -42,7 +43,7 @@ import org.junit.Test;
/**
* This class contains unit tests for the {@link BlobClient} with ssl enabled.
*/
-public class BlobClientSslTest {
+public class BlobClientSslTest extends TestLogger {
/** The buffer size used during the tests in bytes. */
private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -63,19 +64,14 @@ public class BlobClientSslTest {
* Starts the SSL enabled BLOB server.
*/
@BeforeClass
- public static void startSSLServer() {
- try {
- Configuration config = new Configuration();
- config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
- config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
- config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
- config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
- BLOB_SSL_SERVER = new BlobServer(config);
- }
- catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ public static void startSSLServer() throws IOException {
+ Configuration config = new Configuration();
+ config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+ config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+ config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+ config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+ BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+
sslClientConfig = new Configuration();
sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -87,20 +83,14 @@ public class BlobClientSslTest {
* Starts the SSL disabled BLOB server.
*/
@BeforeClass
- public static void startNonSSLServer() {
- try {
- Configuration config = new Configuration();
- config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
- config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
- config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
- config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
- config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
- BLOB_SERVER = new BlobServer(config);
- }
- catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ public static void startNonSSLServer() throws IOException {
+ Configuration config = new Configuration();
+ config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+ config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
+ config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore");
+ config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+ config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
+ BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
clientConfig = new Configuration();
clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -113,13 +103,13 @@ public class BlobClientSslTest {
* Shuts the BLOB server down.
*/
@AfterClass
- public static void stopServers() {
+ public static void stopServers() throws IOException {
if (BLOB_SSL_SERVER != null) {
- BLOB_SSL_SERVER.shutdown();
+ BLOB_SSL_SERVER.close();
}
if (BLOB_SERVER != null) {
- BLOB_SERVER.shutdown();
+ BLOB_SERVER.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 8f8f8c5..fda4ee9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -57,24 +57,18 @@ public class BlobClientTest {
* Starts the BLOB server.
*/
@BeforeClass
- public static void startServer() {
- try {
- blobServiceConfig = new Configuration();
- BLOB_SERVER = new BlobServer(blobServiceConfig);
- }
- catch (IOException e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ public static void startServer() throws IOException {
+ blobServiceConfig = new Configuration();
+ BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore());
}
/**
* Shuts the BLOB server down.
*/
@AfterClass
- public static void stopServer() {
+ public static void stopServer() throws IOException {
if (BLOB_SERVER != null) {
- BLOB_SERVER.shutdown();
+ BLOB_SERVER.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index f8d50d5..4f12ddb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -30,16 +30,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Random;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -59,10 +56,20 @@ public class BlobRecoveryITCase extends TestLogger {
config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());
- testBlobServerRecovery(config);
+ BlobStoreService blobStoreService = null;
+
+ try {
+ blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
+ testBlobServerRecovery(config, blobStoreService);
+ } finally {
+ if (blobStoreService != null) {
+ blobStoreService.closeAndCleanupAllData();
+ }
+ }
}
- public static void testBlobServerRecovery(final Configuration config) throws IOException {
+ public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException {
final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
Random rand = new Random();
@@ -73,7 +80,7 @@ public class BlobRecoveryITCase extends TestLogger {
try {
for (int i = 0; i < server.length; i++) {
- server[i] = new BlobServer(config);
+ server[i] = new BlobServer(config, blobStore);
serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
}
@@ -166,7 +173,7 @@ public class BlobRecoveryITCase extends TestLogger {
finally {
for (BlobServer s : server) {
if (s != null) {
- s.shutdown();
+ s.close();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 025a2ff..e8e28a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -44,10 +44,11 @@ public class BlobServerDeleteTest {
public void testDeleteSingle() {
BlobServer server = null;
BlobClient client = null;
+ BlobStore blobStore = new VoidBlobStore();
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, blobStore);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -93,10 +94,11 @@ public class BlobServerDeleteTest {
public void testDeleteAll() {
BlobServer server = null;
BlobClient client = null;
+ BlobStore blobStore = new VoidBlobStore();
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, blobStore);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -156,10 +158,11 @@ public class BlobServerDeleteTest {
public void testDeleteAlreadyDeletedByBlobKey() {
BlobServer server = null;
BlobClient client = null;
+ BlobStore blobStore = new VoidBlobStore();
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, blobStore);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -195,10 +198,11 @@ public class BlobServerDeleteTest {
public void testDeleteAlreadyDeletedByName() {
BlobServer server = null;
BlobClient client = null;
+ BlobStore blobStore = new VoidBlobStore();
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, blobStore);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -237,10 +241,11 @@ public class BlobServerDeleteTest {
BlobServer server = null;
BlobClient client = null;
+ BlobStore blobStore = new VoidBlobStore();
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, blobStore);
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -289,7 +294,11 @@ public class BlobServerDeleteTest {
}
}
if (server != null) {
- server.shutdown();
+ try {
+ server.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 59a62e1..6d1dba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -40,13 +40,13 @@ public class BlobServerGetTest {
private final Random rnd = new Random();
@Test
- public void testGetFailsDuringLookup() {
+ public void testGetFailsDuringLookup() throws IOException {
BlobServer server = null;
BlobClient client = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -66,37 +66,27 @@ public class BlobServerGetTest {
try {
client.get(key);
fail("This should not succeed.");
- }
- catch (IOException e) {
+ } catch (IOException e) {
// expected
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ client.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
@Test
- public void testGetFailsDuringStreaming() {
+ public void testGetFailsDuringStreaming() throws IOException {
BlobServer server = null;
BlobClient client = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -129,21 +119,12 @@ public class BlobServerGetTest {
catch (IOException e) {
// expected
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ client.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c4d6d1c..441ca7d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -42,13 +42,13 @@ public class BlobServerPutTest {
private final Random rnd = new Random();
@Test
- public void testPutBufferSuccessful() {
+ public void testPutBufferSuccessful() throws IOException {
BlobServer server = null;
BlobClient client = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -95,34 +95,25 @@ public class BlobServerPutTest {
BlobUtils.readFully(is3, result3, 0, result3.length, null);
is3.close();
assertArrayEquals(data, result3);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ client.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
@Test
- public void testPutStreamSuccessful() {
+ public void testPutStreamSuccessful() throws IOException {
BlobServer server = null;
BlobClient client = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -143,12 +134,7 @@ public class BlobServerPutTest {
String stringKey = "my test key";
client.put(jid, stringKey, new ByteArrayInputStream(data));
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (client != null) {
try {
client.close();
@@ -157,19 +143,19 @@ public class BlobServerPutTest {
}
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
@Test
- public void testPutChunkedStreamSuccessful() {
+ public void testPutChunkedStreamSuccessful() throws IOException {
BlobServer server = null;
BlobClient client = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
client = new BlobClient(serverAddress, config);
@@ -190,27 +176,18 @@ public class BlobServerPutTest {
String stringKey = "my test key";
client.put(jid, stringKey, new ChunkedInputStream(data, 17));
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ client.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
@Test
- public void testPutBufferFails() {
+ public void testPutBufferFails() throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
BlobServer server = null;
@@ -219,7 +196,7 @@ public class BlobServerPutTest {
File tempFileDir = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
// make sure the blob server cannot create any files in its storage dir
tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
@@ -250,31 +227,22 @@ public class BlobServerPutTest {
// expected
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
// set writable again to make sure we can remove the directory
if (tempFileDir != null) {
tempFileDir.setWritable(true, false);
}
if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ client.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
@Test
- public void testPutNamedBufferFails() {
+ public void testPutNamedBufferFails() throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
BlobServer server = null;
@@ -283,7 +251,7 @@ public class BlobServerPutTest {
File tempFileDir = null;
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
// make sure the blob server cannot create any files in its storage dir
tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile();
@@ -317,25 +285,16 @@ public class BlobServerPutTest {
catch (IllegalStateException e) {
// expected
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
// set writable again to make sure we can remove the directory
if (tempFileDir != null) {
tempFileDir.setWritable(true, false);
}
if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ client.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index ea0eb94..fbcd4a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger {
public void testOnEphemeralPort() throws IOException {
Configuration conf = new Configuration();
conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
- BlobServer srv = new BlobServer(conf);
- srv.shutdown();
+ BlobServer srv = new BlobServer(conf, new VoidBlobStore());
+ srv.close();
}
/**
@@ -63,7 +63,7 @@ public class BlobServerRangeTest extends TestLogger {
// this thing is going to throw an exception
try {
- BlobServer srv = new BlobServer(conf);
+ BlobServer srv = new BlobServer(conf, new VoidBlobStore());
} finally {
socket.close();
}
@@ -92,9 +92,9 @@ public class BlobServerRangeTest extends TestLogger {
// this thing is going to throw an exception
try {
- BlobServer srv = new BlobServer(conf);
+ BlobServer srv = new BlobServer(conf, new VoidBlobStore());
Assert.assertEquals(availablePort, srv.getPort());
- srv.shutdown();
+ srv.close();
} finally {
sockets[0].close();
sockets[1].close();
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 93f9b73..91e119b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer {
private int numFailures;
- public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException {
- super(config);
+ public TestingFailingBlobServer(Configuration config, BlobStore blobStore, int numFailures) throws IOException {
+ super(config, blobStore);
this.numFailures = numFailures;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5d9ade3..98e6b3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,13 +18,12 @@
package org.apache.flink.runtime.execution.librarycache;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.util.OperatingSystem;
@@ -45,7 +44,7 @@ import java.util.List;
public class BlobLibraryCacheManagerTest {
@Test
- public void testLibraryCacheManagerCleanup() {
+ public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException {
JobID jid = new JobID();
List<BlobKey> keys = new ArrayList<BlobKey>();
@@ -56,7 +55,7 @@ public class BlobLibraryCacheManagerTest {
try {
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort());
BlobClient bc = new BlobClient(blobSocketAddress, config);
@@ -108,14 +107,9 @@ public class BlobLibraryCacheManagerTest {
assertEquals(2, caughtExceptions);
bc.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (server != null) {
- server.shutdown();
+ server.close();
}
if (libraryCacheManager != null) {
@@ -130,7 +124,7 @@ public class BlobLibraryCacheManagerTest {
}
@Test
- public void testRegisterAndDownload() {
+ public void testRegisterAndDownload() throws IOException {
assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows.
BlobServer server = null;
@@ -139,9 +133,9 @@ public class BlobLibraryCacheManagerTest {
try {
// create the blob transfer services
Configuration config = new Configuration();
- server = new BlobServer(config);
+ server = new BlobServer(config, new VoidBlobStore());
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
- cache = new BlobCache(serverAddress, config);
+ cache = new BlobCache(serverAddress, config, new VoidBlobStore());
// upload some meaningless data to the server
BlobClient uploader = new BlobClient(serverAddress, config);
@@ -210,22 +204,17 @@ public class BlobLibraryCacheManagerTest {
catch (IOException e) {
// splendid!
}
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
+ } finally {
if (cacheDir != null) {
if (!cacheDir.setWritable(true, false)) {
System.err.println("Could not re-add write permissions to cache directory.");
}
}
if (cache != null) {
- cache.shutdown();
+ cache.close();
}
if (server != null) {
- server.shutdown();
+ server.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 54e1a9b..16e3a05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.TestLogger;
@@ -63,6 +65,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
BlobCache cache = null;
BlobLibraryCacheManager libCache = null;
+ BlobStoreService blobStoreService = null;
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
@@ -70,8 +73,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());
try {
+ blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
+
for (int i = 0; i < server.length; i++) {
- server[i] = new BlobServer(config);
+ server[i] = new BlobServer(config, blobStoreService);
serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort());
libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000);
}
@@ -89,7 +94,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
}
// The cache
- cache = new BlobCache(serverAddress[0], config);
+ cache = new BlobCache(serverAddress[0], config, blobStoreService);
libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
// Register uploaded libraries
@@ -110,10 +115,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
}
// Shutdown cache and start with other server
- cache.shutdown();
+ cache.close();
libCache.shutdown();
- cache = new BlobCache(serverAddress[1], config);
+ cache = new BlobCache(serverAddress[1], config, blobStoreService);
libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);
// Verify key 1
@@ -156,17 +161,21 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger {
finally {
for (BlobServer s : server) {
if (s != null) {
- s.shutdown();
+ s.close();
}
}
if (cache != null) {
- cache.shutdown();
+ cache.close();
}
if (libCache != null) {
libCache.shutdown();
}
+
+ if (blobStoreService != null) {
+ blobStoreService.closeAndCleanupAllData();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
index 06ffe3c..d89093d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
@@ -22,11 +22,11 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
@@ -62,7 +62,10 @@ public class ZooKeeperRegistryTest extends TestLogger {
configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
final HighAvailabilityServices zkHaService = new ZooKeeperHaServices(
- ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration);
+ ZooKeeperUtils.startCuratorFramework(configuration),
+ Executors.directExecutor(),
+ configuration,
+ new VoidBlobStore());
final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry();
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b8b5984..a63b02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -71,7 +70,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -102,7 +100,6 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -190,7 +187,11 @@ public class JobManagerHARecoveryTest extends TestLogger {
TestingUtils.defaultExecutor(),
instanceManager,
scheduler,
- new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000),
+ new BlobLibraryCacheManager(
+ new BlobServer(
+ flinkConfiguration,
+ testingHighAvailabilityServices.createBlobStore()),
+ 3600000L),
archive,
new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
timeout,
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index d6257ba..70800e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -184,7 +185,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
TestingUtils.defaultExecutor(),
new InstanceManager(),
new Scheduler(TestingUtils.defaultExecutionContext()),
- new BlobLibraryCacheManager(new BlobServer(configuration), 10L),
+ new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L),
ActorRef.noSender(),
new NoRestartStrategy.NoRestartStrategyFactory(),
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0ea47f2..0282a4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,17 +18,21 @@
package org.apache.flink.runtime.leaderelection;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
@@ -64,10 +68,13 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());
- highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
- config,
+ CuratorFramework client = ZooKeeperUtils.startCuratorFramework(config);
+
+ highAvailabilityServices = new ZooKeeperHaServices(
+ client,
TestingUtils.defaultExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+ config,
+ new VoidBlobStore());
}
@After
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 58f2231..d6fc48c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -97,7 +97,7 @@ public class TaskManagerMetricsTest extends TestLogger {
taskManagerServices.getMemoryManager(),
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
- highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+ highAvailabilityServices,
tmRegistry);
final ActorRef taskManager = actorSystem.actorOf(tmProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 2a4c036..9dcfc70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -149,9 +149,6 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
network.start();
- LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(
- HighAvailabilityServices.DEFAULT_JOB_ID);
-
MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
// create the task manager
@@ -164,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
ioManager,
network,
numberOfSlots,
- leaderRetrievalService,
+ highAvailabilityServices,
new MetricRegistry(metricRegistryConfiguration));
taskManager = actorSystem.actorOf(tmProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 92de31a..0844aad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.concurrent.Executors;
@@ -57,6 +58,7 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
+import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -601,7 +603,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
}
@Test
- public void testCheckForValidRegistrationSessionIDs() {
+ public void testCheckForValidRegistrationSessionIDs() throws IOException {
new JavaTestKit(actorSystem) {{
ActorGateway taskManagerGateway = null;
@@ -612,6 +614,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class);
when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID)))
.thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID));
+ when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore());
try {
// we make the test actor (the test kit) the JobManager to intercept
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
# -----------------------------------------------------------------------------
# Console (use 'console')
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4be3299..1b9ee48 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor
val components = JobManager.createJobManagerComponents(
config,
executor,
- executor)
+ executor,
+ highAvailabilityServices.createBlobStore())
// Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService.
// The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus,
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 09dc5ed..1db0a85 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,9 +19,9 @@
package org.apache.flink.runtime.testingUtils
import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -32,15 +32,15 @@ import scala.language.postfixOps
/** Subclass of the [[TaskManager]] to support testing messages
*/
class TestingTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService,
- metricRegistry : MetricRegistry)
+ config: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry : MetricRegistry)
extends TaskManager(
config,
resourceID,
@@ -49,19 +49,19 @@ class TestingTaskManager(
ioManager,
network,
numberOfSlots,
- leaderRetrievalService,
+ highAvailabilityServices,
metricRegistry)
with TestingTaskManagerLike {
def this(
- config: TaskManagerConfiguration,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService,
- metricRegistry : MetricRegistry) {
+ config: TaskManagerConfiguration,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry : MetricRegistry) {
this(
config,
ResourceID.generate(),
@@ -70,7 +70,7 @@ class TestingTaskManager(
ioManager,
network,
numberOfSlots,
- leaderRetrievalService,
+ highAvailabilityServices,
metricRegistry)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 5f9d178..2983d66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -155,6 +155,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum);
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath());
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
"leader", 1, config);
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 0f82faa..1df4b8d 100644
--- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -19,6 +19,7 @@
package org.apache.flink.yarn
import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
@@ -40,19 +41,19 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
* @param ioManager IOManager responsible for I/O
* @param network NetworkEnvironment for this actor
* @param numberOfSlots Number of slots for this TaskManager
- * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
- * JobManager
+ * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval
+ * service for retrieving the leading JobManager
*/
class TestingYarnTaskManager(
- config: TaskManagerConfiguration,
- resourceID: ResourceID,
- connectionInfo: TaskManagerLocation,
- memoryManager: MemoryManager,
- ioManager: IOManager,
- network: NetworkEnvironment,
- numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService,
- metricRegistry : MetricRegistry)
+ config: TaskManagerConfiguration,
+ resourceID: ResourceID,
+ connectionInfo: TaskManagerLocation,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ highAvailabilityServices: HighAvailabilityServices,
+ metricRegistry : MetricRegistry)
extends YarnTaskManager(
config,
resourceID,
@@ -61,7 +62,7 @@ class TestingYarnTaskManager(
ioManager,
network,
numberOfSlots,
- leaderRetrievalService,
+ highAvailabilityServices,
metricRegistry)
with TestingTaskManagerLike {
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index e9c3904..f81d040 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.FileSystemBlobStore;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
* HA services clean up */
protected final Path haDataDirectory;
+ /** Blob store service to be used for the BlobServer and BlobCache */
+ protected final BlobStoreService blobStoreService;
+
/** Flag marking this instance as shut down */
private volatile boolean closed;
@@ -153,6 +157,8 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
}
LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory);
+
+ blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
}
// ------------------------------------------------------------------------
@@ -163,7 +169,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
public BlobStore createBlobStore() throws IOException {
enter();
try {
- return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString());
+ return blobStoreService;
} finally {
exit();
}
@@ -192,11 +198,23 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
}
closed = true;
+ Throwable exception = null;
+
+ try {
+ blobStoreService.close();
+ } catch (Throwable t) {
+ exception = t;
+ }
+
// we do not propagate exceptions here, but only log them
try {
hadoopFileSystem.close();
} catch (Throwable t) {
- LOG.warn("Error closing Hadoop FileSystem", t);
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
+ if (exception != null) {
+ ExceptionUtils.rethrowException(exception, "Could not properly close the YarnHighAvailabilityServices.");
}
}
finally {
@@ -213,12 +231,18 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe
// we remember exceptions only, then continue cleanup, and re-throw at the end
Throwable exception = null;
+ try {
+ blobStoreService.closeAndCleanupAllData();
+ } catch (Throwable t) {
+ exception = t;
+ }
+
// first, we delete all data in Flink's data directory
try {
flinkFileSystem.delete(haDataDirectory, true);
}
catch (Throwable t) {
- exception = t;
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
// now we actually close the services
http://git-wip-us.apache.org/repos/asf/flink/blob/e3ea89a9/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index be31085..b7f4c9a 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -19,9 +19,9 @@
package org.apache.flink.yarn
import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
import org.apache.flink.runtime.metrics.MetricRegistry
@@ -38,7 +38,7 @@ class YarnTaskManager(
ioManager: IOManager,
network: NetworkEnvironment,
numberOfSlots: Int,
- leaderRetrievalService: LeaderRetrievalService,
+ highAvailabilityServices: HighAvailabilityServices,
metricRegistry : MetricRegistry)
extends TaskManager(
config,
@@ -48,7 +48,7 @@ class YarnTaskManager(
ioManager,
network,
numberOfSlots,
- leaderRetrievalService,
+ highAvailabilityServices,
metricRegistry) {
override def handleMessage: Receive = {