You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:52 UTC
[13/37] storm git commit: Formatting issues.
Formatting issues.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/31479552
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/31479552
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/31479552
Branch: refs/heads/master
Commit: 3147955254247b5008427002ca788d8c72c61f13
Parents: 2a782ce
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Nov 9 16:50:13 2015 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Nov 9 16:50:13 2015 -0600
----------------------------------------------------------------------
.../backtype/storm/messaging/netty/Context.java | 6 +++--
.../netty/KerberosSaslClientHandler.java | 17 ++++++------
.../netty/KerberosSaslNettyClient.java | 28 ++++++++++----------
.../netty/KerberosSaslNettyClientState.java | 10 +++----
.../netty/KerberosSaslNettyServer.java | 18 ++++++-------
.../netty/KerberosSaslNettyServerState.java | 2 +-
.../netty/KerberosSaslServerHandler.java | 19 +++++++------
.../netty/NettyUncaughtExceptionHandler.java | 18 ++++++-------
8 files changed, 59 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 10c5059..5d27a16 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -21,9 +21,10 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
@@ -54,7 +55,6 @@ public class Context implements IContext {
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
- // TODO investigate impact of having one worker
if (maxWorkers > 0) {
clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory), maxWorkers);
@@ -103,10 +103,12 @@ public class Context implements IContext {
for (IConnection conn : connections.values()) {
conn.close();
}
+
connections = null;
//we need to release resources associated with client channel factory
clientChannelFactory.releaseExternalResources();
+
}
private String key(String host, int port) {
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
index 9ae34fe..ee0e41d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory;
public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger LOG = LoggerFactory
- .getLogger(KerberosSaslClientHandler.class);
+ .getLogger(KerberosSaslClientHandler.class);
private ISaslClient client;
long start_time;
/** Used for client or server's token to send or receive from each other. */
@@ -47,7 +47,7 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
@Override
public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent event) {
+ ChannelStateEvent event) {
// register the newly established channel
Channel channel = ctx.getChannel();
client.channelConnected(channel);
@@ -57,14 +57,14 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
try {
KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
- .get(channel);
+ .get(channel);
if (saslNettyClient == null) {
LOG.debug("Creating saslNettyClient now for channel: {}",
channel);
saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section);
KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
- saslNettyClient);
+ saslNettyClient);
}
LOG.debug("Going to initiate Kerberos negotiations.");
byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
@@ -80,15 +80,15 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
- throws Exception {
+ throws Exception {
LOG.debug("send/recv time (ms): {}",
- (System.currentTimeMillis() - start_time));
+ (System.currentTimeMillis() - start_time));
Channel channel = ctx.getChannel();
// Generate SASL response to server using Channel-local SASL client.
KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
- .get(channel);
+ .get(channel);
if (saslNettyClient == null) {
throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
}
@@ -111,8 +111,7 @@ public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
// perform this request. The client's request will now proceed
// to the next pipeline component namely StormClientHandler.
Channels.fireMessageReceived(ctx, msg);
- }
- else {
+ } else {
LOG.warn("Unexpected control message: {}", msg);
}
return;
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
index 32afab0..e540a4c 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -46,7 +46,7 @@ import org.slf4j.LoggerFactory;
public class KerberosSaslNettyClient {
private static final Logger LOG = LoggerFactory
- .getLogger(KerberosSaslNettyClient.class);
+ .getLogger(KerberosSaslNettyClient.class);
/**
* Used to respond to server's counterpart, SaslServer with SASL tokens
@@ -55,16 +55,16 @@ public class KerberosSaslNettyClient {
private SaslClient saslClient;
private Subject subject;
private String jaas_section;
-
+
/**
* Create a KerberosSaslNettyClient for authentication with servers.
*/
public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
SaslUtils.KERBEROS);
-
+
LOG.info("Creating Kerberos Client.");
-
+
Configuration login_conf;
try {
login_conf = AuthUtils.GetConfiguration(storm_conf);
@@ -74,14 +74,14 @@ public class KerberosSaslNettyClient {
throw t;
}
LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
-
+
SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
-
+
subject = null;
try {
LOG.debug("Setting Configuration to login_config: {}", login_conf);
//specify a configuration object to be used
- Configuration.setConfiguration(login_conf);
+ Configuration.setConfiguration(login_conf);
//now login
LOG.debug("Trying to login.");
Login login = new Login(jaas_section, ch);
@@ -91,9 +91,9 @@ public class KerberosSaslNettyClient {
LOG.error("Client failed to login in principal:" + ex, ex);
throw new RuntimeException(ex);
}
-
+
//check the credential of our principal
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
LOG.error("Failed to verify user principal.");
throw new RuntimeException("Fail to verify user principal with section \"" +
jaas_section +
@@ -138,12 +138,12 @@ public class KerberosSaslNettyClient {
}
});
LOG.info("Got Client: {}", saslClient);
-
+
} catch (PrivilegedActionException e) {
LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
throw new RuntimeException(e);
}
-}
+ }
public boolean isComplete() {
return saslClient.isComplete();
@@ -151,7 +151,7 @@ public class KerberosSaslNettyClient {
/**
* Respond to server's SASL token.
- *
+ *
* @param saslTokenMessage
* contains server's SASL token
* @return client's response SASL token
@@ -188,7 +188,7 @@ public class KerberosSaslNettyClient {
/**
* Set private members using topology token.
- *
+ *
* @param topologyToken
*/
public SaslClientCallbackHandler() {
@@ -196,7 +196,7 @@ public class KerberosSaslNettyClient {
/**
* Implementation used to respond to SASL tokens from server.
- *
+ *
* @param callbacks
* objects that indicate what credential information the
* server's SaslServer requires from the client.
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
index 1283d9b..2546aa5 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -22,10 +22,10 @@ import org.jboss.netty.channel.ChannelLocal;
final class KerberosSaslNettyClientState {
- public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
- protected KerberosSaslNettyClient initialValue(Channel channel) {
- return null;
- }
- };
+ public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
+ protected KerberosSaslNettyClient initialValue(Channel channel) {
+ return null;
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
index a0003c6..a935608 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -55,7 +55,7 @@ class KerberosSaslNettyServer {
private Subject subject;
private String jaas_section;
private List<String> authorizedUsers;
-
+
KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
this.authorizedUsers = authorizedUsers;
LOG.debug("Getting Configuration.");
@@ -67,17 +67,17 @@ class KerberosSaslNettyServer {
LOG.error("Failed to get login_conf: ", t);
throw t;
}
-
+
LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(storm_conf, authorizedUsers);
-
+
//login our principal
subject = null;
try {
LOG.debug("Setting Configuration to login_config: {}", login_conf);
//specify a configuration object to be used
- Configuration.setConfiguration(login_conf);
+ Configuration.setConfiguration(login_conf);
//now login
LOG.debug("Trying to login.");
Login login = new Login(jaas_section, ch);
@@ -87,9 +87,9 @@ class KerberosSaslNettyServer {
LOG.error("Server failed to login in principal:", ex);
throw new RuntimeException(ex);
}
-
+
//check the credential of our principal
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
LOG.error("Failed to verifyuser principal.");
throw new RuntimeException("Fail to verify user principal with section \""
+ jaas_section
@@ -97,7 +97,7 @@ class KerberosSaslNettyServer {
+ login_conf);
}
- try {
+ try {
LOG.info("Creating Kerberos Server.");
final CallbackHandler fch = ch;
Principal p = (Principal)subject.getPrincipals().toArray()[0];
@@ -123,7 +123,7 @@ class KerberosSaslNettyServer {
}
});
LOG.info("Got Server: {}", saslServer);
-
+
} catch (PrivilegedActionException e) {
LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
throw new RuntimeException(e);
@@ -192,7 +192,7 @@ class KerberosSaslNettyServer {
/**
* Used by SaslTokenMessage::processToken() to respond to server SASL
* tokens.
- *
+ *
* @param token
* Server's SASL token
* @return token to send back to the server.
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
index 064dc91..e7a127e 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -26,5 +26,5 @@ final class KerberosSaslNettyServerState {
protected KerberosSaslNettyServer initialValue(Channel channel) {
return null;
}
- };
+ };
}
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
index 3ed3fd7..e4a6e29 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -36,9 +36,9 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
private Map storm_conf;
private String jaas_section;
private List<String> authorizedUsers;
-
+
private static final Logger LOG = LoggerFactory
- .getLogger(KerberosSaslServerHandler.class);
+ .getLogger(KerberosSaslServerHandler.class);
public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
this.server = server;
@@ -49,14 +49,14 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
+ throws Exception {
Object msg = e.getMessage();
if (msg == null)
return;
Channel channel = ctx.getChannel();
-
+
if (msg instanceof SaslMessageToken) {
// initialize server-side SASL functionality, if we haven't yet
// (in which case we are looking at the first SASL message from the
@@ -71,11 +71,11 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
try {
saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
} catch (RuntimeException ioe) {
- LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
+ LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
channel.getLocalAddress(), channel.getRemoteAddress());
saslNettyServer = null;
}
-
+
KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
saslNettyServer);
} else {
@@ -85,17 +85,16 @@ public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
.getSaslToken());
-
+
SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
if(saslTokenMessageRequest.getSaslToken() == null) {
channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
- }
- else {
+ } else {
// Send response to client.
channel.write(saslTokenMessageRequest);
}
-
+
if (saslNettyServer.isComplete()) {
// If authentication of client is complete, we will also send a
// SASL-Complete message to the client.
http://git-wip-us.apache.org/repos/asf/storm/blob/31479552/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
index 3d31544..ad8b5d9 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
@@ -22,14 +22,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
- private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class);
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- try {
- Utils.handleUncaughtException(e);
- } catch (Error error) {
- LOG.info("Received error in netty thread.. terminating server...");
- Runtime.getRuntime().exit(1);
+ private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class);
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ try {
+ Utils.handleUncaughtException(e);
+ } catch (Error error) {
+ LOG.info("Received error in netty thread.. terminating server...");
+ Runtime.getRuntime().exit(1);
+ }
}
- }
}