You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/08 18:24:22 UTC

[flink] branch master updated (5bcaf2e -> fc98a1f)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 5bcaf2e  [FLINK-11073] [core] Rename CompositeSerializerSnapshot to NestedSerializersSnapshotDelegate
     new 1072bbf  [FLINK-10555] [test] Port AkkaSslITCase to new code base
     new fc98a1f  [FLINK-10555] Clean-up BlobServerSSLTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/blob/BlobServerSSLTest.java      |  93 ++++++++++++
 .../apache/flink/runtime/akka/AkkaSslITCase.scala  | 156 ---------------------
 2 files changed, 93 insertions(+), 156 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java
 delete mode 100644 flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala


[flink] 01/02: [FLINK-10555] [test] Port AkkaSslITCase to new code base

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1072bbfe500999eb3e3710271f15ff53a534475d
Author: Tzu-Li Chen <wa...@gmail.com>
AuthorDate: Tue Oct 16 01:29:56 2018 +0800

    [FLINK-10555] [test] Port AkkaSslITCase to new code base
    
    [FLINK-10555] tests are all of BlobServer initialization, also remove already covered tests
    
    This closes #6849.
---
 .../flink/runtime/blob/BlobServerSSLTest.java      | 110 +++++++++++++++
 .../apache/flink/runtime/akka/AkkaSslITCase.scala  | 156 ---------------------
 2 files changed, 110 insertions(+), 156 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java
new file mode 100644
index 0000000..49957c2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.junit.Assert.fail;
+
+/**
+ * Testing a {@link BlobServer} would fail with improper SSL config.
+ */
+public class BlobServerSSLTest extends TestLogger {
+
+	@Test
+	public void testFailedToInitWithTwoProtocolsSet() {
+		final Configuration config = new Configuration();
+
+		config.setString(JobManagerOptions.ADDRESS, "127.0.0.1");
+		config.setString(TaskManagerOptions.HOST, "127.0.0.1");
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+
+		config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
+		config.setString(SecurityOptions.SSL_KEYSTORE,
+			getClass().getResource("/local127.keystore").getPath());
+		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_TRUSTSTORE,
+			getClass().getResource("/local127.truststore").getPath());
+
+		config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_ALGORITHMS, "TLSv1,TLSv1.1");
+
+		try (final BlobServer blobServer = new BlobServer(config, new VoidBlobStore())) {
+			fail();
+		} catch (Exception e) {
+			findThrowable(e, IOException.class);
+			findThrowableWithMessage(e, "Unable to open BLOB Server in specified port range: 0");
+		}
+	}
+
+	@Test
+	public void testFailedToInitWithInvalidSslKeystoreConfigured() {
+		final Configuration config = new Configuration();
+
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		config.setString(AkkaOptions.ASK_TIMEOUT, "2 s");
+
+		config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
+		config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore");
+		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
+		config.setString(SecurityOptions.SSL_TRUSTSTORE, "invalid.keystore");
+		config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
+
+		try (final BlobServer blobServer = new BlobServer(config, new VoidBlobStore())) {
+			fail();
+		} catch (Exception e) {
+			findThrowable(e, IOException.class);
+			findThrowableWithMessage(e, "Failed to initialize SSL for the blob server");
+		}
+	}
+
+	@Test
+	public void testFailedToInitWithMissingMandatorySslConfiguration() {
+		final Configuration config = new Configuration();
+
+		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		config.setString(AkkaOptions.ASK_TIMEOUT, "2 s");
+
+		config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
+
+		try (final BlobServer blobServer = new BlobServer(config, new VoidBlobStore())) {
+			fail();
+		} catch (Exception e) {
+			findThrowable(e, IOException.class);
+			findThrowableWithMessage(e, "Failed to initialize SSL for the blob server");
+		}
+	}
+}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
deleted file mode 100644
index a6b6028..0000000
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaSslITCase.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka
-
-import akka.actor.ActorSystem
-import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration._
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
-import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import org.scalatest.junit.JUnitRunner
-
-/**
-  * Testing the flink cluster using SSL transport for akka remoting
-  */
-@RunWith(classOf[JUnitRunner])
-class AkkaSslITCase(_system: ActorSystem)
-  extends TestKit(_system)
-    with ImplicitSender
-    with WordSpecLike
-    with Matchers
-    with BeforeAndAfterAll
-    with ScalaTestingUtils {
-
-  def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
-
-  override def afterAll(): Unit = {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The flink Cluster" must {
-
-    "start with akka ssl enabled" in {
-
-      val config = new Configuration()
-      config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
-      config.setString(TaskManagerOptions.HOST, "127.0.0.1")
-      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
-      config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-
-      config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true)
-      config.setString(SecurityOptions.SSL_KEYSTORE,
-        getClass.getResource("/local127.keystore").getPath)
-      config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password")
-      config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password")
-      config.setString(SecurityOptions.SSL_TRUSTSTORE,
-        getClass.getResource("/local127.truststore").getPath)
-
-      config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password")
-
-      val cluster = new TestingCluster(config, false)
-
-      cluster.start(true)
-
-      assert(cluster.running)
-    }
-
-    "Failed to start ssl enabled akka with two protocols set" in {
-
-      an[Exception] should be thrownBy {
-
-        val config = new Configuration()
-        config.setString(JobManagerOptions.ADDRESS, "127.0.0.1")
-        config.setString(TaskManagerOptions.HOST, "127.0.0.1")
-        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-
-        config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true)
-        config.setString(SecurityOptions.SSL_KEYSTORE,
-          getClass.getResource("/local127.keystore").getPath)
-        config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password")
-        config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password")
-        config.setString(SecurityOptions.SSL_TRUSTSTORE,
-          getClass.getResource("/local127.truststore").getPath)
-
-        config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password")
-        config.setString(SecurityOptions.SSL_ALGORITHMS, "TLSv1,TLSv1.1")
-
-        val cluster = new TestingCluster(config, false)
-
-        cluster.start(true)
-      }
-    }
-
-    "start with akka ssl disabled" in {
-
-      val config = new Configuration()
-      config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
-      config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-      config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, false)
-
-      val cluster = new TestingCluster(config, false)
-
-      cluster.start(true)
-
-      assert(cluster.running)
-    }
-
-    "fail to start with invalid ssl keystore configured" in {
-
-      an[Exception] should be thrownBy {
-
-        val config = new Configuration()
-        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-        config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
-
-        config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true)
-        config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore")
-        config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password")
-        config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password")
-        config.setString(SecurityOptions.SSL_TRUSTSTORE, "invalid.keystore")
-        config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password")
-
-        val cluster = new TestingCluster(config, false)
-
-        cluster.start(true)
-      }
-    }
-
-    "fail to start with missing mandatory ssl configuration" in {
-
-      an[Exception] should be thrownBy {
-
-        val config = new Configuration()
-        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1)
-        config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
-        config.setString(AkkaOptions.ASK_TIMEOUT, "2 s")
-
-        config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true)
-
-        val cluster = new TestingCluster(config, false)
-
-        cluster.start(true)
-      }
-    }
-
-  }
-
-}


[flink] 02/02: [FLINK-10555] Clean-up BlobServerSSLTest

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc98a1f96cce951355c08a6be4193bbdef02b10c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 8 19:04:18 2019 +0100

    [FLINK-10555] Clean-up BlobServerSSLTest
---
 .../flink/runtime/blob/BlobServerSSLTest.java      | 23 +++-------------------
 1 file changed, 3 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java
index 49957c2..6f84930 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerSSLTest.java
@@ -18,12 +18,8 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -43,11 +39,6 @@ public class BlobServerSSLTest extends TestLogger {
 	public void testFailedToInitWithTwoProtocolsSet() {
 		final Configuration config = new Configuration();
 
-		config.setString(JobManagerOptions.ADDRESS, "127.0.0.1");
-		config.setString(TaskManagerOptions.HOST, "127.0.0.1");
-		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-
 		config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
 		config.setString(SecurityOptions.SSL_KEYSTORE,
 			getClass().getResource("/local127.keystore").getPath());
@@ -59,7 +50,7 @@ public class BlobServerSSLTest extends TestLogger {
 		config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 		config.setString(SecurityOptions.SSL_ALGORITHMS, "TLSv1,TLSv1.1");
 
-		try (final BlobServer blobServer = new BlobServer(config, new VoidBlobStore())) {
+		try (final BlobServer ignored = new BlobServer(config, new VoidBlobStore())) {
 			fail();
 		} catch (Exception e) {
 			findThrowable(e, IOException.class);
@@ -71,10 +62,6 @@ public class BlobServerSSLTest extends TestLogger {
 	public void testFailedToInitWithInvalidSslKeystoreConfigured() {
 		final Configuration config = new Configuration();
 
-		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		config.setString(AkkaOptions.ASK_TIMEOUT, "2 s");
-
 		config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
 		config.setString(SecurityOptions.SSL_KEYSTORE, "invalid.keystore");
 		config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password");
@@ -82,7 +69,7 @@ public class BlobServerSSLTest extends TestLogger {
 		config.setString(SecurityOptions.SSL_TRUSTSTORE, "invalid.keystore");
 		config.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
 
-		try (final BlobServer blobServer = new BlobServer(config, new VoidBlobStore())) {
+		try (final BlobServer ignored = new BlobServer(config, new VoidBlobStore())) {
 			fail();
 		} catch (Exception e) {
 			findThrowable(e, IOException.class);
@@ -94,13 +81,9 @@ public class BlobServerSSLTest extends TestLogger {
 	public void testFailedToInitWithMissingMandatorySslConfiguration() {
 		final Configuration config = new Configuration();
 
-		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-		config.setString(AkkaOptions.ASK_TIMEOUT, "2 s");
-
 		config.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
 
-		try (final BlobServer blobServer = new BlobServer(config, new VoidBlobStore())) {
+		try (final BlobServer ignored = new BlobServer(config, new VoidBlobStore())) {
 			fail();
 		} catch (Exception e) {
 			findThrowable(e, IOException.class);