You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/08 08:04:35 UTC
[1/3] storm git commit: STORM-2479: Fix port assignment race
condition in storm-webapp tests
Repository: storm
Updated Branches:
refs/heads/master 0639244f7 -> 9755ff547
STORM-2479: Fix port assignment race condition in storm-webapp tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1044473b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1044473b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1044473b
Branch: refs/heads/master
Commit: 1044473bb191c15dabfa434b8126f2fa523f59c9
Parents: c38d795
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Sun Apr 16 21:20:15 2017 +0200
Committer: Stig Rohde Døssing <st...@gmail.com>
Committed: Mon Apr 17 10:31:45 2017 +0200
----------------------------------------------------------------------
.../jvm/org/apache/storm/nimbus/NimbusInfo.java | 2 +-
.../storm/security/auth/ITransportPlugin.java | 5 +
.../security/auth/SaslTransportPlugin.java | 13 +-
.../security/auth/SimpleTransportPlugin.java | 11 +-
.../storm/security/auth/ThriftServer.java | 45 +++-
.../storm/security/auth/ThriftServerTest.java | 38 ---
.../apache/storm/security/auth/auth_test.clj | 244 +++++++++----------
.../apache/storm/daemon/drpc/DRPCServer.java | 41 +++-
.../storm/daemon/drpc/DRPCServerTest.java | 55 ++---
9 files changed, 229 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java b/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
index 2b5033e..d8f8adf 100644
--- a/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
+++ b/storm-client/src/jvm/org/apache/storm/nimbus/NimbusInfo.java
@@ -39,7 +39,7 @@ public class NimbusInfo implements Serializable {
public NimbusInfo(String host, int port, boolean isLeader) {
if (host == null) throw new NullPointerException("Host cannot be null");
- if (port <= 0) throw new IllegalArgumentException("Port must be positive");
+ if (port < 0) throw new IllegalArgumentException("Port cannot be negative");
this.host = host;
this.port = port;
this.isLeader = isLeader;
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
index c0ab525..c60b2f2 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java
@@ -54,4 +54,9 @@ public interface ITransportPlugin {
* Only applicable when using secure storm cluster. A null/blank value here will just indicate to use the logged in user.
*/
public TTransport connect(TTransport transport, String serverHost, String asUser) throws IOException, TTransportException;
+
+ /**
+ * @return The port this transport is using. This is not known until {@link #getServer(org.apache.thrift.TProcessor)} has been called.
+ */
+ public int getPort();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
index cad2b30..d93573b 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SaslTransportPlugin.java
@@ -53,6 +53,7 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
protected ThriftConnectionType type;
protected Map storm_conf;
protected Configuration login_conf;
+ private int port;
@Override
public void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
@@ -63,15 +64,16 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
@Override
public TServer getServer(TProcessor processor) throws IOException, TTransportException {
- int port = type.getPort(storm_conf);
+ int configuredPort = type.getPort(storm_conf);
Integer socketTimeout = type.getSocketTimeOut(storm_conf);
TTransportFactory serverTransportFactory = getServerTransportFactory();
TServerSocket serverTransport = null;
if (socketTimeout != null) {
- serverTransport = new TServerSocket(port, socketTimeout);
+ serverTransport = new TServerSocket(configuredPort, socketTimeout);
} else {
- serverTransport = new TServerSocket(port);
+ serverTransport = new TServerSocket(configuredPort);
}
+ this.port = serverTransport.getServerSocket().getLocalPort();
int numWorkerThreads = type.getNumThreads(storm_conf);
Integer queueSize = type.getQueueSize(storm_conf);
@@ -100,6 +102,11 @@ public abstract class SaslTransportPlugin implements ITransportPlugin {
* @throws IOException
*/
protected abstract TTransportFactory getServerTransportFactory() throws IOException;
+
+ @Override
+ public int getPort() {
+ return this.port;
+ }
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
index 26cd4a1..b41af75 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/SimpleTransportPlugin.java
@@ -55,6 +55,7 @@ public class SimpleTransportPlugin implements ITransportPlugin {
protected Map storm_conf;
protected Configuration login_conf;
private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
+ private int port;
@Override
public void prepare(ThriftConnectionType type, Map storm_conf, Configuration login_conf) {
@@ -65,8 +66,9 @@ public class SimpleTransportPlugin implements ITransportPlugin {
@Override
public TServer getServer(TProcessor processor) throws IOException, TTransportException {
- int port = type.getPort(storm_conf);
- TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
+ int configuredPort = type.getPort(storm_conf);
+ TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(configuredPort);
+ this.port = serverTransport.getPort();
int numWorkerThreads = type.getNumThreads(storm_conf);
int maxBufferSize = type.getMaxBufferSize(storm_conf);
Integer queueSize = type.getQueueSize(storm_conf);
@@ -113,6 +115,11 @@ public class SimpleTransportPlugin implements ITransportPlugin {
return null;
}
+ @Override
+ public int getPort() {
+ return port;
+ }
+
/**
* Processor that populate simple transport info into ReqContext, and then invoke a service handler
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
index f97dceb..059b0d6 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftServer.java
@@ -17,12 +17,14 @@
*/
package org.apache.storm.security.auth;
+import java.io.IOException;
import java.util.Map;
import javax.security.auth.login.Configuration;
import org.apache.thrift.TProcessor;
import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +33,9 @@ public class ThriftServer {
private Map _storm_conf; //storm configuration
protected TProcessor _processor = null;
private final ThriftConnectionType _type;
- private TServer _server = null;
+ private TServer _server;
private Configuration _login_conf;
+ private int _port;
public ThriftServer(Map storm_conf, TProcessor processor, ThriftConnectionType type) {
_storm_conf = storm_conf;
@@ -45,34 +48,50 @@ public class ThriftServer {
} catch (Exception x) {
LOG.error(x.getMessage(), x);
}
+ try {
+ //locate our thrift transport plugin
+ ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf);
+ //server
+ _server = transportPlugin.getServer(_processor);
+ _port = transportPlugin.getPort();
+ } catch (IOException | TTransportException ex) {
+ handleServerException(ex);
+ }
+
}
public void stop() {
- if (_server != null)
- _server.stop();
+ _server.stop();
}
/**
* @return true if ThriftServer is listening to requests?
*/
public boolean isServing() {
- return _server != null && _server.isServing();
+ return _server.isServing();
}
public void serve() {
try {
- //locate our thrift transport plugin
- ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(_type, _storm_conf, _login_conf);
-
- //server
- _server = transportPlugin.getServer(_processor);
-
//start accepting requests
_server.serve();
} catch (Exception ex) {
- LOG.error("ThriftServer is being stopped due to: " + ex, ex);
- if (_server != null) _server.stop();
- Runtime.getRuntime().halt(1); //shutdown server process since we could not handle Thrift requests any more
+ handleServerException(ex);
+ }
+ }
+
+ private void handleServerException(Exception ex) {
+ LOG.error("ThriftServer is being stopped due to: " + ex, ex);
+ if (_server != null) {
+ _server.stop();
}
+ Runtime.getRuntime().halt(1); //shutdown server process since we could not handle Thrift requests any more
+ }
+
+ /**
+ * @return The port this server is/will be listening on
+ */
+ public int getPort() {
+ return _port;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java b/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java
deleted file mode 100644
index cad1f1d..0000000
--- a/storm-client/test/jvm/org/apache/storm/security/auth/ThriftServerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.security.auth;
-
-import org.apache.storm.utils.Utils;
-import org.junit.Assert;
-
-import org.junit.Test;
-
-public class ThriftServerTest {
-
- @Test
- public void testStopChecksForNull() {
- ThriftServer server = new ThriftServer(Utils.readDefaultConfig(), null, ThriftConnectionType.DRPC);
- server.stop();
- }
-
- @Test
- public void testIsServingChecksForNull() {
- ThriftServer server = new ThriftServer(Utils.readDefaultConfig(), null, ThriftConnectionType.DRPC);
- Assert.assertFalse(server.isServing());
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index fc95097..6eec5db 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -119,10 +119,10 @@
(dummy-service-handler conf inimbus nil)))
-(defn launch-server [server-port login-cfg aznClass transportPluginClass serverConf]
+(defn launch-server [login-cfg aznClass transportPluginClass serverConf]
(let [conf1 (merge (clojurify-structure (ConfigUtils/readStormConfig))
{NIMBUS-AUTHORIZER aznClass
- NIMBUS-THRIFT-PORT server-port
+ NIMBUS-THRIFT-PORT 0
STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass})
conf2 (if login-cfg (merge conf1 {"java.security.auth.login.config" login-cfg}) conf1)
conf (if serverConf (merge conf2 serverConf) conf2)
@@ -137,10 +137,10 @@
(Testing/whileTimeout (reify Testing$Condition (exec [this] (not (.isServing server)))) (fn [] (Time/sleep 100)))
server ))
-(defmacro with-server [args & body]
- `(let [server# (launch-server ~@args)]
+(defmacro with-server [[server-sym & args] & body]
+ `(let [~server-sym (launch-server ~@args)]
~@body
- (.stop server#)
+ (.stop ~server-sym)
))
(deftest kerb-to-local-test
@@ -151,49 +151,46 @@
(is (= "someone" (.toLocal kptol (mk-principal "someone/host@realm"))))))
(deftest Simple-authentication-test
- (let [a-port (Utils/getAvailablePort)]
- (with-server [a-port nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (.activate nimbus_client "security_auth_test_topology")
- (.close client))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Server: Simple vs. Client: Digest"
- (is (thrown-cause? org.apache.thrift.transport.TTransportException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
+ (with-server [server nil nil "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (.activate nimbus_client "security_auth_test_topology")
+ (.close client))
+
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
+ (testing "(Negative authentication) Server: Simple vs. Client: Digest"
+ (is (thrown-cause? org.apache.thrift.transport.TTransportException
+ (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
(deftest negative-whitelist-authorization-test
- (let [a-port (Utils/getAvailablePort)]
- (with-server [a-port nil
- "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
- "org.apache.storm.testing.SingleUserSimpleTransport" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Negative authorization) Authorization plugin should reject client request"
- (is (thrown-cause? AuthorizationException
- (.activate nimbus_client "security_auth_test_topology"))))
- (.close client)))))
+ (with-server [server nil
+ "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
+ "org.apache.storm.testing.SingleUserSimpleTransport" nil]
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Negative authorization) Authorization plugin should reject client request"
+ (is (thrown-cause? AuthorizationException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client))))
(deftest positive-whitelist-authorization-test
- (let [a-port (Utils/getAvailablePort)]
- (with-server [a-port nil
- "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
- "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Authorization plugin should accept client request"
- (.activate nimbus_client "security_auth_test_topology"))
- (.close client)))))
+ (with-server [server nil
+ "org.apache.storm.security.auth.authorizer.SimpleWhitelistAuthorizer"
+ "org.apache.storm.testing.SingleUserSimpleTransport" {SimpleWhitelistAuthorizer/WHITELIST_USERS_CONF ["user"]}]
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.testing.SingleUserSimpleTransport"})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (.activate nimbus_client "security_auth_test_topology"))
+ (.close client))))
(deftest simple-acl-user-auth-test
(let [cluster-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -326,91 +323,88 @@
(deftest positive-authorization-test
- (let [a-port (Utils/getAvailablePort)]
- (with-server [a-port nil
- "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
- "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authorization) Authorization plugin should accept client request"
- (.activate nimbus_client "security_auth_test_topology"))
- (.close client)))))
+ (with-server [server nil
+ "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
+ "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (.activate nimbus_client "security_auth_test_topology"))
+ (.close client))))
(deftest deny-authorization-test
- (let [a-port (Utils/getAvailablePort)]
- (with-server [a-port nil
- "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
- "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
- Config/NIMBUS_THRIFT_PORT a-port
- DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Negative authorization) Authorization plugin should reject client request"
- (is (thrown-cause? AuthorizationException
- (.activate nimbus_client "security_auth_test_topology"))))
- (.close client)))))
+ (with-server [server nil
+ "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
+ "org.apache.storm.security.auth.SimpleTransportPlugin" nil]
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
+ Config/NIMBUS_THRIFT_PORT (.getPort server)
+ DaemonConfig/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Negative authorization) Authorization plugin should reject client request"
+ (is (thrown-cause? AuthorizationException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client))))
(deftest digest-authentication-test
- (let [a-port (Utils/getAvailablePort)]
- (with-server [a-port
- "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- nil
- "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Positive authentication) valid digest authentication"
- (.activate nimbus_client "security_auth_test_topology"))
- (.close client))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
- STORM-NIMBUS-RETRY-TIMES 0})
- client (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)
- nimbus_client (.getClient client)]
- (testing "(Negative authentication) Server: Digest vs. Client: Simple"
- (is (thrown-cause? org.apache.thrift.transport.TTransportException
- (.activate nimbus_client "security_auth_test_topology"))))
- (.close client))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Invalid password"
- (is (thrown-cause? TTransportException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Unknown user"
- (is (thrown-cause? TTransportException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) nonexistent configuration file"
- (is (thrown-cause? RuntimeException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout)))))
-
- (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
- "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf"
- STORM-NIMBUS-RETRY-TIMES 0})]
- (testing "(Negative authentication) Missing client"
- (is (thrown-cause? java.io.IOException
- (NimbusClient. storm-conf "localhost" a-port nimbus-timeout))))))))
+ (with-server [server
+ "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+ nil
+ "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin" nil]
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Positive authentication) valid digest authentication"
+ (.activate nimbus_client "security_auth_test_topology"))
+ (.close client))
+
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
+ STORM-NIMBUS-RETRY-TIMES 0})
+ client (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)
+ nimbus_client (.getClient client)]
+ (testing "(Negative authentication) Server: Digest vs. Client: Simple"
+ (is (thrown-cause? org.apache.thrift.transport.TTransportException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client))
+
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_bad_password.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
+ (testing "(Negative authentication) Invalid password"
+ (is (thrown-cause? TTransportException
+ (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
+
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_unknown_user.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
+ (testing "(Negative authentication) Unknown user"
+ (is (thrown-cause? TTransportException
+ (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
+
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/nonexistent.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
+ (testing "(Negative authentication) nonexistent configuration file"
+ (is (thrown-cause? RuntimeException
+ (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))
+
+ (let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
+ {STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
+ "java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest_missing_client.conf"
+ STORM-NIMBUS-RETRY-TIMES 0})]
+ (testing "(Negative authentication) Missing client"
+ (is (thrown-cause? java.io.IOException
+ (NimbusClient. storm-conf "localhost" (.getPort server) nimbus-timeout)))))))
(deftest test-GetTransportPlugin-throws-RuntimeException
(let [conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
index 0b15cfa..3e37ad7 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java
@@ -61,7 +61,7 @@ public class DRPCServer implements AutoCloseable {
private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map<String, Object> conf) {
ThriftServer ret = null;
- if (port != null && port > 0) {
+ if (port != null && port >= 0) {
ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service),
ThriftConnectionType.DRPC);
}
@@ -76,7 +76,7 @@ public class DRPCServer implements AutoCloseable {
private static Server mkHttpServer(Map<String, Object> conf, DRPC drpc) {
Integer drpcHttpPort = (Integer) conf.get(DaemonConfig.DRPC_HTTP_PORT);
Server ret = null;
- if (drpcHttpPort != null && drpcHttpPort > 0) {
+ if (drpcHttpPort != null && drpcHttpPort >= 0) {
LOG.info("Starting RPC HTTP servers...");
String filterClass = (String) (conf.get(DaemonConfig.DRPC_HTTP_FILTER));
@SuppressWarnings("unchecked")
@@ -119,6 +119,7 @@ public class DRPCServer implements AutoCloseable {
private final ThriftServer _handlerServer;
private final ThriftServer _invokeServer;
private final Server _httpServer;
+ private Thread _handlerServerThread;
private boolean _closed = false;
public DRPCServer(Map<String, Object> conf) {
@@ -133,13 +134,21 @@ public class DRPCServer implements AutoCloseable {
void start() throws Exception {
LOG.info("Starting Distributed RPC servers...");
new Thread(() -> _invokeServer.serve()).start();
-
+
if (_httpServer != null) {
_httpServer.start();
}
if (_handlerServer != null) {
- _handlerServer.serve();
+ _handlerServerThread = new Thread(_handlerServer::serve);
+ _handlerServerThread.start();
+ }
+ }
+
+ @VisibleForTesting
+ void awaitTermination() throws InterruptedException {
+ if(_handlerServerThread != null) {
+ _handlerServerThread.join();
} else {
_httpServer.join();
}
@@ -169,6 +178,29 @@ public class DRPCServer implements AutoCloseable {
}
}
+ /**
+ * @return The port the DRPC handler server is listening on
+ */
+ public int getDRPCPort() {
+ return _handlerServer.getPort();
+ }
+
+ /**
+ * @return The port the DRPC invoke server is listening on
+ */
+ public int getDRPCInvokePort() {
+ return _invokeServer.getPort();
+ }
+
+ /**
+ * @return The port the HTTP server is listening on. Not available until {@link #start() } has run.
+ */
+ public int getHttpServerPort() {
+ assert _httpServer.getConnectors().length == 1;
+
+ return _httpServer.getConnectors()[0].getLocalPort();
+ }
+
public static void main(String [] args) throws Exception {
Utils.setupDefaultUncaughtExceptionHandler();
Map<String, Object> conf = Utils.readStormConfig();
@@ -176,6 +208,7 @@ public class DRPCServer implements AutoCloseable {
Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close());
StormMetricsRegistry.startMetricsReporters(conf);
server.start();
+ server.awaitTermination();
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1044473b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
----------------------------------------------------------------------
diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
index 594e9c3..76652e2 100644
--- a/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
+++ b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java
@@ -38,7 +38,6 @@ import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.security.auth.SimpleTransportPlugin;
import org.apache.storm.utils.DRPCClient;
-import org.apache.storm.utils.Utils;
import org.junit.AfterClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -87,16 +86,11 @@ public class DRPCServerTest {
@Test
public void testGoodThrift() throws Exception {
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+ Map<String, Object> conf = getConf(0, 0, null);
try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
- try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
- DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ server.start();
+ try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort());
+ DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) {
Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
DRPCRequest request = getNextAvailableRequest(invoke, "testing");
assertNotNull(request);
@@ -111,16 +105,11 @@ public class DRPCServerTest {
@Test
public void testFailedThrift() throws Exception {
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, null);
+ Map<String, Object> conf = getConf(0, 0, null);
try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
- try (DRPCClient client = new DRPCClient(conf, "localhost", drpcPort);
- DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
+ server.start();
+ try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort());
+ DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) {
Future<String> found = exec.submit(() -> client.getClient().execute("testing", "test"));
DRPCRequest request = getNextAvailableRequest(invoke, "testing");
assertNotNull(request);
@@ -155,19 +144,13 @@ public class DRPCServerTest {
@Test
public void testGoodHttpGet() throws Exception {
LOG.info("STARTING HTTP GET TEST...");
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- int httpPort = Utils.getAvailablePort(invocationsPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+ Map<String, Object> conf = getConf(0, 0, 0);
try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
+ server.start();
//TODO need a better way to do this
Thread.sleep(2000);
- try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
- Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+ try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) {
+ Future<String> found = exec.submit(() -> GET(server.getHttpServerPort(), "testing", "test"));
DRPCRequest request = getNextAvailableRequest(invoke, "testing");
assertNotNull(request);
assertEquals("test", request.get_func_args());
@@ -182,19 +165,13 @@ public class DRPCServerTest {
@Test
public void testFailedHttpGet() throws Exception {
LOG.info("STARTING HTTP GET (FAIL) TEST...");
- int drpcPort = Utils.getAvailablePort();
- int invocationsPort = Utils.getAvailablePort(drpcPort + 1);
- int httpPort = Utils.getAvailablePort(invocationsPort + 1);
- Map<String, Object> conf = getConf(drpcPort, invocationsPort, httpPort);
+ Map<String, Object> conf = getConf(0, 0, 0);
try (DRPCServer server = new DRPCServer(conf)) {
- exec.submit(() -> {
- server.start();
- return null;
- });
+ server.start();
//TODO need a better way to do this
Thread.sleep(2000);
- try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", invocationsPort)) {
- Future<String> found = exec.submit(() -> GET(httpPort, "testing", "test"));
+ try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) {
+ Future<String> found = exec.submit(() -> GET(server.getHttpServerPort(), "testing", "test"));
DRPCRequest request = getNextAvailableRequest(invoke, "testing");
assertNotNull(request);
assertEquals("test", request.get_func_args());
[3/3] storm git commit: STORM-2479: CHANGELOG
Posted by ka...@apache.org.
STORM-2479: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9755ff54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9755ff54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9755ff54
Branch: refs/heads/master
Commit: 9755ff547de3247fe4aa1b60a778983145f43f76
Parents: 09b475a
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 8 17:04:20 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 8 17:04:20 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9755ff54/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4343c28..955c17b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2479: Fix port assignment race condition in storm-webapp tests
* STORM-2191: shorten classpaths by using wildcards
* STORM-2495: Integrate checkstyle check during build
* STORM-2486: Prevent cd from printing target directory to avoid breaking classpath
[2/3] storm git commit: Merge branch 'STORM-2479' of
https://github.com/srdo/storm into STORM-2479-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2479' of https://github.com/srdo/storm into STORM-2479-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/09b475af
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/09b475af
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/09b475af
Branch: refs/heads/master
Commit: 09b475af32b7b5176261de18ffcd2d4facd1b30d
Parents: 0639244 1044473
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon May 8 17:04:00 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon May 8 17:04:00 2017 +0900
----------------------------------------------------------------------
.../jvm/org/apache/storm/nimbus/NimbusInfo.java | 2 +-
.../storm/security/auth/ITransportPlugin.java | 5 +
.../security/auth/SaslTransportPlugin.java | 13 +-
.../security/auth/SimpleTransportPlugin.java | 11 +-
.../storm/security/auth/ThriftServer.java | 45 +++-
.../storm/security/auth/ThriftServerTest.java | 38 ---
.../apache/storm/security/auth/auth_test.clj | 244 +++++++++----------
.../apache/storm/daemon/drpc/DRPCServer.java | 41 +++-
.../storm/daemon/drpc/DRPCServerTest.java | 55 ++---
9 files changed, 229 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/09b475af/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------